1 module connection.connection; 2 3 import utils.debugging : debugPrint; 4 import std.conv : to; 5 import std.socket : Socket, AddressFamily, SocketType, ProtocolType, parseAddress, SocketFlags, Address; 6 import core.thread : Thread; 7 import std.stdio : writeln, File; 8 import std.json : JSONValue, parseJSON, JSONException, JSONType, toJSON; 9 import std.string : cmp; 10 import handlers.handler; 11 import listeners.listener; 12 import server.server; 13 import handlers.response; 14 15 16 public final class BesterConnection : Thread 17 { 18 19 /* The socket to the client */ 20 private Socket clientConnection; 21 22 /* The server backend */ 23 public BesterServer server; 24 25 /* The client's credentials */ 26 private string username; 27 private string password; 28 29 /* The connection scope */ 30 public enum Scope 31 { 32 CLIENT, 33 SERVER, 34 UNKNOWN 35 } 36 37 /* The type of this connection */ 38 private Scope connectionType = Scope.UNKNOWN; 39 40 public Scope getType() 41 { 42 return connectionType; 43 } 44 45 this(Socket clientConnection, BesterServer server) 46 { 47 /* Save socket and set thread worker function pointer */ 48 super(&run); 49 this.clientConnection = clientConnection; 50 this.server = server; 51 52 debugPrint("New client handler spawned for " ~ clientConnection.remoteAddress().toAddrString()); 53 } 54 55 override public string toString() 56 { 57 return clientConnection.remoteAddress().toAddrString(); 58 } 59 60 public string[] getCredentials() 61 { 62 return [username, password]; 63 } 64 65 /* Send a message to the user/server */ 66 public void sendMessage(JSONValue replyMessage) 67 { 68 /* TODO: Implement me */ 69 } 70 71 /* Read/send loop */ 72 private void run() 73 { 74 while(true) 75 { 76 /* Receive buffer */ 77 byte[] buffer; 78 79 /* Byte counter for loop-consumer */ 80 uint currentByte = 0; 81 82 /* Bytes received counter */ 83 long bytesReceived; 84 85 /* TODO: Add fix here to loop for bytes */ 86 while(currentByte < 4) 87 { 88 /* Size buffer */ 89 byte[4] tempBuffer; 90 91 /* Read at most 4 bytes */ 92 bytesReceived = clientConnection.receive(tempBuffer); 93 94 if(!(bytesReceived > 0)) 95 { 96 /* TODO: Handle error here */ 97 debugPrint("Error with receiving"); 98 return; 99 } 100 else 101 { 102 /** 103 * Read the bytes from the temp buffer (as many as was received) 104 * and append them to the *real* buffer. 105 */ 106 buffer ~= tempBuffer[0..bytesReceived]; 107 108 /* Increment the byte counter */ 109 currentByte += bytesReceived; 110 } 111 } 112 113 /* Get the message length */ 114 int messageLength = *(cast(int*)buffer.ptr); 115 writeln("Message length: ", cast(uint)messageLength); 116 117 /* TODO: Testing locally ain't good as stuff arrives way too fast, although not as fast as I can type */ 118 /* What must happen is a loop to loop and wait for data */ 119 120 /* Full message buffer */ 121 byte[] messageBuffer; 122 123 124 /* TODO: Add timeout if we haven't received a message in a certain amount of time */ 125 126 /* Reset the current byte counter */ 127 currentByte = 0; 128 129 while(currentByte < messageLength) 130 { 131 /** 132 * Receive 20 bytes (at most) at a time and don't dequeue from 133 * the kernel's TCP stack's buffer. 134 */ 135 byte[20] messageBufferPartial; 136 bytesReceived = clientConnection.receive(messageBufferPartial, SocketFlags.PEEK); 137 138 /* Check for receive error */ 139 if(!(bytesReceived > 0)) 140 { 141 debugPrint("Error receiving"); 142 return; 143 } 144 else 145 { 146 /* TODO: Make sure we only take [0, messageLength) bytes */ 147 if(cast(uint)bytesReceived+currentByte > messageLength) 148 { 149 byte[] remainingBytes; 150 remainingBytes.length = messageLength-currentByte; 151 152 clientConnection.receive(remainingBytes); 153 154 /* Increment counter of received bytes */ 155 currentByte += remainingBytes.length; 156 157 /* Append the received bytes to the FULL message buffer */ 158 messageBuffer ~= remainingBytes; 159 160 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 161 } 162 else 163 { 164 /* Increment counter of received bytes */ 165 currentByte += bytesReceived; 166 167 168 /* Append the received bytes to the FULL message buffer */ 169 messageBuffer ~= messageBufferPartial[0..bytesReceived]; 170 171 /* TODO: Bug when over send, we must not allow this */ 172 173 174 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 175 176 clientConnection.receive(messageBufferPartial); 177 } 178 179 180 } 181 } 182 183 /* Process the message */ 184 processMessage(messageBuffer); 185 } 186 } 187 188 /* TODO: Pass in type and just payload or what */ 189 private bool dispatch(string payloadType, JSONValue payload) 190 { 191 /* TODO: Implement me */ 192 debugPrint("Dispatching payload [" ~ payloadType ~ "]"); 193 debugPrint("Payload: " ~ payload.toPrettyString()); 194 195 /* Status of dispatch */ 196 bool dispatchStatus = true; 197 198 /* Lookup the payloadType handler */ 199 MessageHandler chosenHandler; 200 201 for(uint i = 0; i < server.handlers.length; i++) 202 { 203 if(cmp(server.handlers[i].getPluginName(), payloadType) == 0) 204 { 205 chosenHandler = server.handlers[i]; 206 break; 207 } 208 } 209 210 /* Check if a handler was found */ 211 if(chosenHandler) 212 { 213 /* If a handler for the message type was found */ 214 215 /* TODO: Send and receive data here */ 216 217 /* Handler's UNIX domain socket */ 218 /* TODO: Change this call here below (also remove startup connection) */ 219 Socket handlerSocket = chosenHandler.getNewSocket(); 220 //writeln(handlerSocket == null); 221 debugPrint("chosenHandler.socketPath: " ~ chosenHandler.socketPath); 222 223 /* Get the payload as a string */ 224 string payloadString = toJSON(payload); 225 226 227 /* Construct the data to send */ 228 byte[] sendBuffer; 229 230 /* TODO: Add 4 bytes of payload length encded in little endian */ 231 int payloadLength = cast(int)payloadString.length; 232 byte* lengthBytes = cast(byte*)&payloadLength; 233 sendBuffer ~= *(lengthBytes+0); 234 sendBuffer ~= *(lengthBytes+1); 235 sendBuffer ~= *(lengthBytes+2); 236 sendBuffer ~= *(lengthBytes+3); 237 238 /* Add the string bytes */ 239 sendBuffer ~= cast(byte[])payloadString; 240 241 /* TODO: Send payload */ 242 writeln("Send buffer: ", sendBuffer); 243 244 debugPrint("Sending payload over to handler for \"" ~ chosenHandler.getPluginName() ~ "\"."); 245 handlerSocket.send(sendBuffer); 246 247 248 /* TODO: Get response */ 249 debugPrint("Waiting for response from handler for \"" ~ chosenHandler.getPluginName() ~ "\"."); 250 251 /* Construct a buffer to receive into */ 252 byte[] receiveBuffer; 253 254 /* The current byte */ 255 uint currentByte = 0; 256 257 /* The amount of bytes received */ 258 long bytesReceived; 259 260 /* Loop consume the next 4 bytes */ 261 while(currentByte < 4) 262 { 263 /* Temporary buffer */ 264 byte[4] tempBuffer; 265 266 /* Read at-most 4 bytes */ 267 bytesReceived = handlerSocket.receive(tempBuffer); 268 269 /* If there was an error reading from the socket */ 270 if(!(bytesReceived > 0)) 271 { 272 /* TODO: Error handling */ 273 debugPrint("Error receiving from UNIX domain socket"); 274 } 275 /* If there is no error reading from the socket */ 276 else 277 { 278 /* Add the read bytes to the *real* buffer */ 279 receiveBuffer ~= tempBuffer[0..bytesReceived]; 280 281 /* Increment the byte counter */ 282 currentByte += bytesReceived; 283 } 284 } 285 286 /* Response message length */ 287 int messageLength = *cast(int*)receiveBuffer.ptr; 288 writeln("Message length is: ", cast(uint)messageLength); 289 290 /* Response message buffer */ 291 byte[] fullMessage; 292 293 /* Reset the byte counter */ 294 currentByte = 0; 295 296 while(currentByte < messageLength) 297 { 298 debugPrint("dhjkh"); 299 300 /** 301 * Receive 20 bytes (at most) at a time and don't dequeue from 302 * the kernel's TCP stack's buffer. 303 */ 304 byte[20] tempBuffer; 305 bytesReceived = handlerSocket.receive(tempBuffer, SocketFlags.PEEK); 306 307 /* Check for an error whilst receiving */ 308 if(!(bytesReceived > 0)) 309 { 310 /* TODO: Error handling */ 311 debugPrint("Error whilst receiving from unix domain socket"); 312 } 313 else 314 { 315 /* TODO: Make sure we only take [0, messageLength) bytes */ 316 if(cast(uint)bytesReceived+currentByte > messageLength) 317 { 318 byte[] remainingBytes; 319 remainingBytes.length = messageLength-currentByte; 320 321 handlerSocket.receive(remainingBytes); 322 323 /* Increment counter of received bytes */ 324 currentByte += remainingBytes.length; 325 326 /* Append the received bytes to the FULL message buffer */ 327 fullMessage ~= remainingBytes; 328 329 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 330 } 331 else 332 { 333 /* Increment counter of received bytes */ 334 currentByte += bytesReceived; 335 336 337 /* Append the received bytes to the FULL message buffer */ 338 fullMessage ~= tempBuffer[0..bytesReceived]; 339 340 /* TODO: Bug when over send, we must not allow this */ 341 342 343 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 344 345 handlerSocket.receive(tempBuffer); 346 } 347 } 348 } 349 350 351 writeln("MEssage ", fullMessage); 352 353 //int messageLength = 0; 354 355 /* TODO: Loop for collect message */ 356 357 /* TODO: So now we have to think about what the hell it means 358 * for a response to be received, like cool and all, but we need 359 * the server to now do something. 360 */ 361 362 363 /* TODO: Set dispatchStatus */ 364 } 365 else 366 { 367 /* TODO: Error handling */ 368 debugPrint("No message handler for payload type \"" ~ payloadType ~ "\" found."); 369 dispatchStatus = false; 370 } 371 372 /* TODO: Set return value */ 373 debugPrint("Dispatch status: " ~ to!(string)(dispatchStatus)); 374 375 return dispatchStatus; 376 } 377 378 379 private JSONValue handlerRun(MessageHandler chosenHandler, JSONValue payload) 380 { 381 /* Handler's UNIX domain socket */ 382 Socket handlerSocket = chosenHandler.getNewSocket(); 383 384 385 /* Get the payload as a string */ 386 string payloadString = toJSON(payload); 387 388 /* Construct the data to send */ 389 byte[] sendBuffer; 390 391 /* TODO: Add 4 bytes of payload length encded in little endian */ 392 int payloadLength = cast(int)payloadString.length; 393 byte* lengthBytes = cast(byte*)&payloadLength; 394 sendBuffer ~= *(lengthBytes+0); 395 sendBuffer ~= *(lengthBytes+1); 396 sendBuffer ~= *(lengthBytes+2); 397 sendBuffer ~= *(lengthBytes+3); 398 399 /* Add the string bytes */ 400 sendBuffer ~= cast(byte[])payloadString; 401 402 /* TODO: Send payload */ 403 writeln("Send buffer: ", sendBuffer); 404 405 debugPrint("Sending payload over to handler for \"" ~ chosenHandler.getPluginName() ~ "\"."); 406 handlerSocket.send(sendBuffer); 407 408 409 /* TODO: Get response */ 410 debugPrint("Waiting for response from handler for \"" ~ chosenHandler.getPluginName() ~ "\"."); 411 412 /* Construct a buffer to receive into */ 413 byte[] receiveBuffer; 414 415 /* The current byte */ 416 uint currentByte = 0; 417 418 /* The amount of bytes received */ 419 long bytesReceived; 420 421 /* Loop consume the next 4 bytes */ 422 while(currentByte < 4) 423 { 424 /* Temporary buffer */ 425 byte[4] tempBuffer; 426 427 /* Read at-most 4 bytes */ 428 bytesReceived = handlerSocket.receive(tempBuffer); 429 430 /* If there was an error reading from the socket */ 431 if(!(bytesReceived > 0)) 432 { 433 /* TODO: Error handling */ 434 debugPrint("Error receiving from UNIX domain socket"); 435 } 436 /* If there is no error reading from the socket */ 437 else 438 { 439 /* Add the read bytes to the *real* buffer */ 440 receiveBuffer ~= tempBuffer[0..bytesReceived]; 441 442 /* Increment the byte counter */ 443 currentByte += bytesReceived; 444 } 445 } 446 447 /* Response message length */ 448 int messageLength = *cast(int*)receiveBuffer.ptr; 449 writeln("Message length is: ", cast(uint)messageLength); 450 451 /* Response message buffer */ 452 byte[] fullMessage; 453 454 /* Reset the byte counter */ 455 currentByte = 0; 456 457 while(currentByte < messageLength) 458 { 459 /** 460 * Receive 20 bytes (at most) at a time and don't dequeue from 461 * the kernel's TCP stack's buffer. 462 */ 463 byte[20] tempBuffer; 464 bytesReceived = handlerSocket.receive(tempBuffer, SocketFlags.PEEK); 465 466 /* Check for an error whilst receiving */ 467 if(!(bytesReceived > 0)) 468 { 469 /* TODO: Error handling */ 470 debugPrint("Error whilst receiving from unix domain socket"); 471 } 472 else 473 { 474 /* TODO: Make sure we only take [0, messageLength) bytes */ 475 if(cast(uint)bytesReceived+currentByte > messageLength) 476 { 477 byte[] remainingBytes; 478 remainingBytes.length = messageLength-currentByte; 479 480 handlerSocket.receive(remainingBytes); 481 482 /* Increment counter of received bytes */ 483 currentByte += remainingBytes.length; 484 485 /* Append the received bytes to the FULL message buffer */ 486 fullMessage ~= remainingBytes; 487 488 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 489 } 490 else 491 { 492 /* Increment counter of received bytes */ 493 currentByte += bytesReceived; 494 495 496 /* Append the received bytes to the FULL message buffer */ 497 fullMessage ~= tempBuffer[0..bytesReceived]; 498 499 /* TODO: Bug when over send, we must not allow this */ 500 501 502 writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes"); 503 504 handlerSocket.receive(tempBuffer); 505 } 506 } 507 } 508 509 510 writeln("MEssage ", fullMessage); 511 512 //int messageLength = 0; 513 514 /* TODO: Loop for collect message */ 515 516 /* TODO: So now we have to think about what the hell it means 517 * for a response to be received, like cool and all, but we need 518 * the server to now do something. 519 */ 520 521 522 /* TODO: Set dispatchStatus */ 523 return parseJSON(cast(string)fullMessage); 524 } 525 526 527 528 /* TODO: Version 2 of message dispatcher */ 529 private bool dispatchMessage(Scope scopeField, JSONValue payloadBlock) 530 { 531 /* Status of dispatch */ 532 bool dispatchStatus = true; 533 534 /* TODO: Bounds checking, type checking */ 535 536 /* Get the payload type */ 537 string payloadType = payloadBlock["type"].str; 538 debugPrint("Payload type is \"" ~ payloadType ~ "\""); 539 540 /* Get the payload data */ 541 JSONValue payloadData = payloadBlock["data"]; 542 543 /* Lookup the payloadType handler */ 544 MessageHandler chosenHandler = server.findHandler(payloadType); 545 546 /* Check if the payload is a built-in command */ 547 if(cmp(payloadType, "builtin") == 0) 548 { 549 /* TODO: Implement me */ 550 debugPrint("Built-in payload type"); 551 552 /** 553 * Built-in commands follow the structure of 554 * "command" : {"type" : "cmdType", "command" : ...} 555 */ 556 JSONValue commandBlock = payloadData["command"]; 557 string commandType = commandBlock["type"].str; 558 JSONValue command = commandBlock["args"]; 559 560 /* If the command is `close` */ 561 if(cmp(commandType, "close") == 0) 562 { 563 debugPrint("Closing socket..."); 564 this.clientConnection.close(); 565 } 566 else 567 { 568 debugPrint("Invalid built-in command type"); 569 /* TODO: Generate error response */ 570 } 571 } 572 /* If an external handler is found (i.e. not a built-in command) */ 573 else if(chosenHandler) 574 { 575 /* TODO: Implement me */ 576 debugPrint("Chosen handler for payload type \"" ~ payloadType ~ "\" is " ~ chosenHandler.getPluginName()); 577 578 try 579 { 580 /* TODO: Collect return value */ 581 HandlerResponse handlerResponse = new HandlerResponse(handlerRun(chosenHandler, payloadData)); 582 583 /* TODO: Continue here, we will make all error handling do on construction as to make this all more compact */ 584 debugPrint("<<< Message Handler [" ~ chosenHandler.getPluginName() ~ "] response >>>\n\n" ~ handlerResponse.toString()); 585 586 /* Execute the message handler's command */ 587 handlerResponse.execute(this); 588 } 589 catch(ResponseError e) 590 { 591 /* In the case of an error with the message handler, send an error to the client/server */ 592 593 /* TODO: Send error here */ 594 } 595 596 597 /* TODO: Handle response */ 598 } 599 else 600 { 601 /* TODO: Implement error handling */ 602 debugPrint("No handler available for payload type \"" ~ payloadType ~ "\""); 603 } 604 605 return dispatchStatus; 606 } 607 608 609 /** 610 * Given the headerBlock, this returns the requested scope 611 * of the connection. 612 */ 613 private Scope getConnectionScope(JSONValue headerBlock) 614 { 615 /* TODO: Type checking and bounds checking */ 616 617 /* Get the scope block */ 618 JSONValue scopeBlock = headerBlock["scope"]; 619 string scopeString = scopeBlock.str(); 620 621 if(cmp(scopeString, "client") == 0) 622 { 623 return Scope.CLIENT; 624 } 625 else if(cmp(scopeString, "server") == 0) 626 { 627 return Scope.SERVER; 628 } 629 630 return Scope.UNKNOWN; 631 } 632 633 /* Process the received message */ 634 private void processMessage(byte[] messageBuffer) 635 { 636 /* The message as a JSONValue struct */ 637 JSONValue jsonMessage; 638 639 640 /* Attempt to convert the message to JSON */ 641 try 642 { 643 /* Convert message to JSON */ 644 jsonMessage = parseJSON(cast(string)messageBuffer); 645 debugPrint("<<< Received JSON >>>\n\n" ~ jsonMessage.toPrettyString()); 646 647 /* TODO: Bounds checking, type checking */ 648 649 /* Get the header */ 650 JSONValue headerBlock = jsonMessage["header"]; 651 652 653 654 /** 655 * Check to see if this connection is currently "untyped". 656 * 657 * If it is then we set the type. 658 */ 659 if(connectionType == Scope.UNKNOWN) 660 { 661 /* Get the scope of the message */ 662 Scope scopeField = getConnectionScope(headerBlock); 663 664 /* TODO: Authenticate if client, else do ntohing for server */ 665 666 /* Decide what action to take depending on the scope */ 667 if(scopeField == Scope.UNKNOWN) 668 { 669 /* If the host-provided `scope` field was invalid */ 670 debugPrint("Host provided scope was UNKNOWN"); 671 672 /* TODO: Send message back about an invalid scope */ 673 674 /* Close the connection */ 675 clientConnection.close(); 676 } 677 else if(scopeField == Scope.CLIENT) 678 { 679 /** 680 * If the host-provided `scope` field is `Scope.CLIENT` 681 * then we must attempt authentication, if it fails 682 * send the client a message back and then close the 683 * connection. 684 */ 685 686 /* Get the authentication block */ 687 JSONValue authenticationBlock = headerBlock["authentication"]; 688 689 /* Get the username and password */ 690 string username = authenticationBlock["username"].str(), password = authenticationBlock["password"].str(); 691 692 /* Attempt authentication */ 693 bool authenticationStatus = server.authenticate(username, password); 694 695 /* Check if the authentication was successful or not */ 696 if(authenticationStatus) 697 { 698 /** 699 * If the authentication was successful then store the 700 * client's credentials. 701 */ 702 this.username = username; 703 this.password = password; 704 } 705 else 706 { 707 /** 708 * If the authentication was unsuccessful then send a 709 * message to the client stating so and close the connection. 710 */ 711 debugPrint("Authenticating the user failed, sending error and closing connection."); 712 713 /* TODO : Send error message to client */ 714 715 /* Close the connection */ 716 clientConnection.close(); 717 } 718 } 719 720 721 /* Set the connection type to `scopeField` */ 722 connectionType = scopeField; 723 } 724 else 725 { 726 /* TODO: Implement worker here */ 727 } 728 729 730 /* Get the payload block */ 731 JSONValue payloadBlock = jsonMessage["payload"]; 732 debugPrint("<<< Payload is >>>\n\n" ~ payloadBlock.toPrettyString()); 733 734 735 /* Dispatch the message */ 736 bool dispatchStatus = dispatchMessage(connectionType, payloadBlock); 737 738 if(dispatchStatus) 739 { 740 debugPrint("Dispatch succeeded"); 741 } 742 else 743 { 744 /* TODO: Error handling */ 745 debugPrint("Dispatching failed..."); 746 } 747 } 748 /* If thr attempt to convert the message to JSON fails */ 749 catch(JSONException exception) 750 { 751 debugPrint("<<< There was an error whilst parsing the JSON message >>>\n\n"~exception.toString()); 752 } 753 754 /* TODO: Return value */ 755 756 } 757 758 759 }