1 module connection.connection;
2 
3 import utils.debugging : debugPrint;
4 import std.conv : to;
5 import std.socket : Socket, AddressFamily, SocketType, ProtocolType, parseAddress, SocketFlags, Address;
6 import core.thread : Thread;
7 import std.stdio : writeln, File;
8 import std.json : JSONValue, parseJSON, JSONException, JSONType, toJSON;
9 import std.string : cmp;
10 import handlers.handler;
11 import listeners.listener;
12 import server.server;
13 import handlers.response;
14 
15 
16 public final class BesterConnection : Thread
17 {
18 
19 	/* The socket to the client */
20 	private Socket clientConnection;
21 
22 	/* The server backend */
23 	public BesterServer server;
24 
25 	/* The client's credentials  */
26 	private string username;
27 	private string password;
28 
29 	/* The connection scope */
30 	public enum Scope
31 	{
32 		CLIENT,
33 		SERVER,
34 		UNKNOWN
35 	}
36 
37 	/* The type of this connection */
38 	private Scope connectionType = Scope.UNKNOWN;
39 
40 	public Scope getType()
41 	{
42 		return connectionType;
43 	}
44 
45 	this(Socket clientConnection, BesterServer server)
46 	{
47 		/* Save socket and set thread worker function pointer */
48 		super(&run);
49 		this.clientConnection = clientConnection;
50 		this.server = server;
51 
52 		debugPrint("New client handler spawned for " ~ clientConnection.remoteAddress().toAddrString());
53 	}
54 
55 	override public string toString()
56 	{
57 		return clientConnection.remoteAddress().toAddrString();
58 	}
59 
60 	public string[] getCredentials()
61 	{
62 		return [username, password];
63 	}
64 
65 	/* Send a message to the user/server */
66 	public void sendMessage(JSONValue replyMessage)
67 	{
68 		/* TODO: Implement me */
69 	}
70 
71 	/* Read/send loop */
72 	private void run()
73 	{
74 		while(true)
75 		{
76 			/* Receive buffer */
77 			byte[] buffer;
78 		
79 			/* Byte counter for loop-consumer */
80 			uint currentByte = 0;
81 
82 			/* Bytes received counter */
83 			long bytesReceived;
84 			
85 			/* TODO: Add fix here to loop for bytes */
86 			while(currentByte < 4)
87 			{
88 				/* Size buffer */
89 				byte[4] tempBuffer;
90 				
91 				/* Read at most 4 bytes */
92 				bytesReceived = clientConnection.receive(tempBuffer);
93 
94 				if(!(bytesReceived > 0))
95 				{
96 					/* TODO: Handle error here */
97 					debugPrint("Error with receiving");
98 					return;
99 				}
100 				else
101 				{
102 					/**
103 					 * Read the bytes from the temp buffer (as many as was received)
104 					 * and append them to the *real* buffer.
105 					 */
106 					buffer ~= tempBuffer[0..bytesReceived];
107 						
108 					/* Increment the byte counter */
109 					currentByte += bytesReceived;	
110 				}
111 			}
112 			
113 			/* Get the message length */
114 			int messageLength = *(cast(int*)buffer.ptr);
115 			writeln("Message length: ", cast(uint)messageLength);
116 
117 			/* TODO: Testing locally ain't good as stuff arrives way too fast, although not as fast as I can type */
118 			/* What must happen is a loop to loop and wait for data */
119 
120 			/* Full message buffer */
121 			byte[] messageBuffer;
122 
123 
124 			/* TODO: Add timeout if we haven't received a message in a certain amount of time */
125 
126 			/* Reset the current byte counter */
127 			currentByte = 0;
128 			
129 			while(currentByte < messageLength)
130 			{
131 				/**
132 				 * Receive 20 bytes (at most) at a time and don't dequeue from
133 				 * the kernel's TCP stack's buffer.
134 				 */
135 				byte[20] messageBufferPartial;
136 				bytesReceived = clientConnection.receive(messageBufferPartial, SocketFlags.PEEK);
137 
138 				/* Check for receive error */
139 				if(!(bytesReceived > 0))
140 				{
141 					debugPrint("Error receiving");
142 					return;
143 				}
144 				else
145 				{
146 					/* TODO: Make sure we only take [0, messageLength) bytes */
147 					if(cast(uint)bytesReceived+currentByte > messageLength)
148 					{
149 						byte[] remainingBytes;
150 						remainingBytes.length = messageLength-currentByte;
151 
152 						clientConnection.receive(remainingBytes);
153 
154 						/* Increment counter of received bytes */
155 						currentByte += remainingBytes.length;
156 
157 						/* Append the received bytes to the FULL message buffer */
158 						messageBuffer ~= remainingBytes;
159 
160 						writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
161 					}
162 					else
163 					{
164 						/* Increment counter of received bytes */
165 						currentByte += bytesReceived;
166 
167 						
168 						/* Append the received bytes to the FULL message buffer */
169 						messageBuffer ~= messageBufferPartial[0..bytesReceived];
170 
171 						/* TODO: Bug when over send, we must not allow this */
172 
173 						
174 						writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");	
175 
176 						clientConnection.receive(messageBufferPartial);
177 					}
178 
179 					
180 				}
181 			}
182 
183 			/* Process the message */
184 			processMessage(messageBuffer);
185 		}
186 	}
187 
188 	/* TODO: Pass in type and just payload or what */
189 	private bool dispatch(string payloadType, JSONValue payload)
190 	{
191 		/* TODO: Implement me */
192 		debugPrint("Dispatching payload [" ~ payloadType ~ "]");
193 		debugPrint("Payload: " ~ payload.toPrettyString());
194 
195 		/* Status of dispatch */
196 		bool dispatchStatus = true;
197 
198 		/* Lookup the payloadType handler */
199 		MessageHandler chosenHandler;
200 
201 		for(uint i = 0; i < server.handlers.length; i++)
202 		{
203 			if(cmp(server.handlers[i].getPluginName(), payloadType) == 0)
204 			{
205 				chosenHandler = server.handlers[i];
206 				break;
207 			}
208 		}
209 
210 		/* Check if a handler was found */
211 		if(chosenHandler)
212 		{
213 			/* If a handler for the message type was found */
214 
215 			/* TODO: Send and receive data here */
216 
217 			/* Handler's UNIX domain socket */
218 			/* TODO: Change this call here below (also remove startup connection) */
219 			Socket handlerSocket = chosenHandler.getNewSocket();
220 			//writeln(handlerSocket == null);
221 			debugPrint("chosenHandler.socketPath: " ~ chosenHandler.socketPath);
222 
223 			/* Get the payload as a string */
224 			string payloadString = toJSON(payload);
225 			
226 
227 			/* Construct the data to send */
228 			byte[] sendBuffer;
229 	
230 			/* TODO: Add 4 bytes of payload length encded in little endian */
231 			int payloadLength = cast(int)payloadString.length;
232 			byte* lengthBytes = cast(byte*)&payloadLength;
233 			sendBuffer ~= *(lengthBytes+0);
234 			sendBuffer ~= *(lengthBytes+1);
235 			sendBuffer ~= *(lengthBytes+2);
236 			sendBuffer ~= *(lengthBytes+3);
237 
238 			/* Add the string bytes */
239 			sendBuffer ~= cast(byte[])payloadString;
240 
241 			/* TODO: Send payload */
242 			writeln("Send buffer: ", sendBuffer);
243 			
244 			debugPrint("Sending payload over to handler for \"" ~ chosenHandler.getPluginName() ~ "\".");
245 			handlerSocket.send(sendBuffer);
246 			
247 
248 			/* TODO: Get response */
249 			debugPrint("Waiting for response from handler for \"" ~ chosenHandler.getPluginName() ~ "\".");
250 
251 			/* Construct a buffer to receive into */
252 			byte[] receiveBuffer;
253 
254 			/* The current byte */
255 			uint currentByte = 0;
256 
257 			/* The amount of bytes received */
258 			long bytesReceived;
259 
260 			/* Loop consume the next 4 bytes */
261 			while(currentByte < 4)
262 			{
263 				/* Temporary buffer */
264 				byte[4] tempBuffer;
265 
266 				/* Read at-most 4 bytes */
267 				bytesReceived = handlerSocket.receive(tempBuffer);
268 
269 				/* If there was an error reading from the socket */
270 				if(!(bytesReceived > 0))
271 				{
272 					/* TODO: Error handling */
273 					debugPrint("Error receiving from UNIX domain socket");
274 				}
275 				/* If there is no error reading from the socket */
276 				else
277 				{
278 					/* Add the read bytes to the *real* buffer */
279 					receiveBuffer ~= tempBuffer[0..bytesReceived];
280 
281 					/* Increment the byte counter */
282 					currentByte += bytesReceived;
283 				}
284 			}
285 
286 			/* Response message length */
287 			int messageLength = *cast(int*)receiveBuffer.ptr;
288 			writeln("Message length is: ", cast(uint)messageLength);
289 
290 			/* Response message buffer */
291 			byte[] fullMessage;
292 
293 			/* Reset the byte counter */
294 			currentByte = 0;
295 
296 			while(currentByte < messageLength)
297 			{
298 				debugPrint("dhjkh");
299 
300 				/**
301 				 * Receive 20 bytes (at most) at a time and don't dequeue from
302 				 * the kernel's TCP stack's buffer.
303 				 */
304 				byte[20] tempBuffer;
305 				bytesReceived = handlerSocket.receive(tempBuffer, SocketFlags.PEEK);
306 
307 				/* Check for an error whilst receiving */
308 				if(!(bytesReceived > 0))
309 				{
310 					/* TODO: Error handling */
311 					debugPrint("Error whilst receiving from unix domain socket");
312 				}
313 				else
314 				{
315 					/* TODO: Make sure we only take [0, messageLength) bytes */
316 					if(cast(uint)bytesReceived+currentByte > messageLength)
317 					{
318 						byte[] remainingBytes;
319 						remainingBytes.length = messageLength-currentByte;
320 
321 						handlerSocket.receive(remainingBytes);
322 
323 						/* Increment counter of received bytes */
324 						currentByte += remainingBytes.length;
325 
326 						/* Append the received bytes to the FULL message buffer */
327 						fullMessage ~= remainingBytes;
328 
329 						writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
330 					}
331 					else
332 					{
333 						/* Increment counter of received bytes */
334 						currentByte += bytesReceived;
335 
336 						
337 						/* Append the received bytes to the FULL message buffer */
338 						fullMessage ~= tempBuffer[0..bytesReceived];
339 
340 						/* TODO: Bug when over send, we must not allow this */
341 
342 						
343 						writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");	
344 
345 						handlerSocket.receive(tempBuffer);
346 					}
347 				}
348 			}
349 
350 
351 			writeln("MEssage ", fullMessage);
352 
353 			//int messageLength = 0;
354 
355 			/* TODO: Loop for collect message */
356 
357 			/* TODO: So now we have to think about what the hell it means
358 			 * for a response to be received, like cool and all, but we need
359 			 * the server to now do something.
360 			 */
361 
362 
363 			/* TODO: Set dispatchStatus */
364 		}
365 		else
366 		{
367 			/* TODO: Error handling */
368 			debugPrint("No message handler for payload type \"" ~ payloadType ~ "\" found.");
369 			dispatchStatus = false;
370 		}
371 		
372 		/* TODO: Set return value */
373 		debugPrint("Dispatch status: " ~ to!(string)(dispatchStatus));
374 
375 		return dispatchStatus;
376 	}
377 
378 
379 	private JSONValue handlerRun(MessageHandler chosenHandler, JSONValue payload)
380 	{
381 		/* Handler's UNIX domain socket */
382 		Socket handlerSocket = chosenHandler.getNewSocket();
383 
384 
385 		/* Get the payload as a string */
386 		string payloadString = toJSON(payload);
387 		
388 		/* Construct the data to send */
389 		byte[] sendBuffer;
390 			
391 		/* TODO: Add 4 bytes of payload length encded in little endian */
392 		int payloadLength = cast(int)payloadString.length;
393 		byte* lengthBytes = cast(byte*)&payloadLength;
394 		sendBuffer ~= *(lengthBytes+0);
395 		sendBuffer ~= *(lengthBytes+1);
396 		sendBuffer ~= *(lengthBytes+2);
397 		sendBuffer ~= *(lengthBytes+3);
398 		
399 		/* Add the string bytes */
400 		sendBuffer ~= cast(byte[])payloadString;
401 		
402 		/* TODO: Send payload */
403 		writeln("Send buffer: ", sendBuffer);
404 					
405 		debugPrint("Sending payload over to handler for \"" ~ chosenHandler.getPluginName() ~ "\".");
406 		handlerSocket.send(sendBuffer);
407 					
408 		
409 		/* TODO: Get response */
410 		debugPrint("Waiting for response from handler for \"" ~ chosenHandler.getPluginName() ~ "\".");
411 		
412 		/* Construct a buffer to receive into */
413 		byte[] receiveBuffer;
414 		
415 		/* The current byte */
416 		uint currentByte = 0;
417 		
418 		/* The amount of bytes received */
419 		long bytesReceived;
420 		
421 		/* Loop consume the next 4 bytes */
422 		while(currentByte < 4)
423 		{
424 			/* Temporary buffer */
425 			byte[4] tempBuffer;
426 		
427 			/* Read at-most 4 bytes */
428 			bytesReceived = handlerSocket.receive(tempBuffer);
429 		
430 			/* If there was an error reading from the socket */
431 			if(!(bytesReceived > 0))
432 			{
433 				/* TODO: Error handling */
434 				debugPrint("Error receiving from UNIX domain socket");
435 			}
436 			/* If there is no error reading from the socket */
437 			else
438 			{
439 				/* Add the read bytes to the *real* buffer */
440 				receiveBuffer ~= tempBuffer[0..bytesReceived];
441 		
442 				/* Increment the byte counter */
443 				currentByte += bytesReceived;
444 			}
445 		}
446 		
447 		/* Response message length */
448 		int messageLength = *cast(int*)receiveBuffer.ptr;
449 		writeln("Message length is: ", cast(uint)messageLength);
450 		
451 		/* Response message buffer */
452 		byte[] fullMessage;
453 		
454 		/* Reset the byte counter */
455 		currentByte = 0;
456 		
457 		while(currentByte < messageLength)
458 		{	
459 			/**
460 			 * Receive 20 bytes (at most) at a time and don't dequeue from
461 			 * the kernel's TCP stack's buffer.
462 			 */
463 			byte[20] tempBuffer;
464 			bytesReceived = handlerSocket.receive(tempBuffer, SocketFlags.PEEK);
465 		
466 			/* Check for an error whilst receiving */
467 			if(!(bytesReceived > 0))
468 			{
469 				/* TODO: Error handling */
470 				debugPrint("Error whilst receiving from unix domain socket");
471 			}
472 			else
473 			{
474 				/* TODO: Make sure we only take [0, messageLength) bytes */
475 				if(cast(uint)bytesReceived+currentByte > messageLength)
476 				{
477 					byte[] remainingBytes;
478 					remainingBytes.length = messageLength-currentByte;
479 		
480 					handlerSocket.receive(remainingBytes);
481 		
482 					/* Increment counter of received bytes */
483 					currentByte += remainingBytes.length;
484 		
485 					/* Append the received bytes to the FULL message buffer */
486 					fullMessage ~= remainingBytes;
487 		
488 					writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");
489 				}
490 				else
491 				{
492 					/* Increment counter of received bytes */
493 					currentByte += bytesReceived;
494 		
495 								
496 					/* Append the received bytes to the FULL message buffer */
497 					fullMessage ~= tempBuffer[0..bytesReceived];
498 		
499 					/* TODO: Bug when over send, we must not allow this */
500 		
501 								
502 					writeln("Received ", currentByte, "/", cast(uint)messageLength, " bytes");	
503 		
504 					handlerSocket.receive(tempBuffer);
505 				}
506 			}
507 		}
508 		
509 		
510 		writeln("MEssage ", fullMessage);
511 		
512 		//int messageLength = 0;
513 		
514 		/* TODO: Loop for collect message */
515 
516 		/* TODO: So now we have to think about what the hell it means
517 		 * for a response to be received, like cool and all, but we need
518 		 * the server to now do something.
519 		 */
520 		
521 		
522 		/* TODO: Set dispatchStatus */
523 		return parseJSON(cast(string)fullMessage);
524 	}
525 
526 	
527 
528 	/* TODO: Version 2 of message dispatcher */
529 	private bool dispatchMessage(Scope scopeField, JSONValue payloadBlock)
530 	{
531 		/* Status of dispatch */
532 		bool dispatchStatus = true;
533 
534 		/* TODO: Bounds checking, type checking */
535 
536 		/* Get the payload type */
537 		string payloadType = payloadBlock["type"].str;
538 		debugPrint("Payload type is \"" ~ payloadType ~ "\"");
539 
540 		/* Get the payload data */
541 		JSONValue payloadData = payloadBlock["data"];
542 
543 		/* Lookup the payloadType handler */
544 		MessageHandler chosenHandler = server.findHandler(payloadType);
545 
546 		/* Check if the payload is a built-in command */
547 		if(cmp(payloadType, "builtin") == 0)
548 		{
549 			/* TODO: Implement me */
550 			debugPrint("Built-in payload type");
551 
552 			/**
553 			 * Built-in commands follow the structure of 
554 			 * "command" : {"type" : "cmdType", "command" : ...}
555 			 */
556 			JSONValue commandBlock = payloadData["command"];
557 			string commandType = commandBlock["type"].str;
558 			JSONValue command = commandBlock["args"];
559 
560 			/* If the command is `close` */
561 			if(cmp(commandType, "close") == 0)
562 			{
563 				debugPrint("Closing socket...");
564 				this.clientConnection.close();
565 			}
566 			else
567 			{
568 				debugPrint("Invalid built-in command type");
569 				/* TODO: Generate error response */
570 			}
571 		}
572 		/* If an external handler is found (i.e. not a built-in command) */
573 		else if(chosenHandler)
574 		{
575 			/* TODO: Implement me */
576 			debugPrint("Chosen handler for payload type \"" ~ payloadType ~ "\" is " ~ chosenHandler.getPluginName());
577 
578 			try
579 			{
580 				/* TODO: Collect return value */
581 				HandlerResponse handlerResponse = new HandlerResponse(handlerRun(chosenHandler, payloadData));	
582 
583 				/* TODO: Continue here, we will make all error handling do on construction as to make this all more compact */
584 				debugPrint("<<< Message Handler [" ~ chosenHandler.getPluginName() ~ "] response >>>\n\n" ~ handlerResponse.toString());
585 
586 				/* Execute the message handler's command */
587 				handlerResponse.execute(this);
588 			}
589 			catch(ResponseError e)
590 			{
591 				/* In the case of an error with the message handler, send an error to the client/server */
592 				
593 				/* TODO: Send error here */
594 			}
595 			
596 
597 			/* TODO: Handle response */
598 		}
599 		else
600 		{
601 			/* TODO: Implement error handling */
602 			debugPrint("No handler available for payload type \"" ~ payloadType ~ "\"");
603 		}
604 
605 		return dispatchStatus;
606 	}
607 
608 
609 	/**
610 	 * Given the headerBlock, this returns the requested scope
611 	 * of the connection.
612 	 */
613 	private Scope getConnectionScope(JSONValue headerBlock)
614 	{
615 		/* TODO: Type checking and bounds checking */
616 
617 		/* Get the scope block */
618 		JSONValue scopeBlock = headerBlock["scope"];
619 		string scopeString = scopeBlock.str();
620 		
621 		if(cmp(scopeString, "client") == 0)
622 		{
623 			return Scope.CLIENT;
624 		}
625 		else if(cmp(scopeString, "server") == 0)
626 		{
627 			return Scope.SERVER;
628 		}
629 
630 		return Scope.UNKNOWN;
631 	}
632 
633 	/* Process the received message */
634 	private void processMessage(byte[] messageBuffer)
635 	{
636 		/* The message as a JSONValue struct */
637 		JSONValue jsonMessage;
638 
639 
640 		/* Attempt to convert the message to JSON */
641 		try
642 		{
643 			/* Convert message to JSON */
644 			jsonMessage = parseJSON(cast(string)messageBuffer);
645 			debugPrint("<<< Received JSON >>>\n\n" ~ jsonMessage.toPrettyString());
646 
647 			/* TODO: Bounds checking, type checking */
648 
649 			/* Get the header */
650 			JSONValue headerBlock = jsonMessage["header"];
651 
652 			
653 
654 			/**
655 			 * Check to see if this connection is currently "untyped".
656 			 *
657 			 * If it is then we set the type.
658 			 */
659 			if(connectionType == Scope.UNKNOWN)
660 			{
661 				/* Get the scope of the message */
662 				Scope scopeField = getConnectionScope(headerBlock);
663 
664 				/* TODO: Authenticate if client, else do ntohing for server */
665 
666 				/* Decide what action to take depending on the scope */
667 				if(scopeField == Scope.UNKNOWN)
668 				{
669 					/* If the host-provided `scope` field was invalid */
670 					debugPrint("Host provided scope was UNKNOWN");
671 
672 					/* TODO: Send message back about an invalid scope */
673 
674 					/* Close the connection */
675 					clientConnection.close();
676 				}
677 				else if(scopeField == Scope.CLIENT)
678 				{
679 					/**
680 					 * If the host-provided `scope` field is `Scope.CLIENT`
681 					 * then we must attempt authentication, if it fails
682 					 * send the client a message back and then close the
683 					 * connection.
684 					 */
685 
686 					/* Get the authentication block */
687 					JSONValue authenticationBlock = headerBlock["authentication"];
688 
689 					/* Get the username and password */
690 					string username = authenticationBlock["username"].str(), password = authenticationBlock["password"].str();
691 
692 					/* Attempt authentication */
693 					bool authenticationStatus = server.authenticate(username, password);
694 
695 					/* Check if the authentication was successful or not */
696 					if(authenticationStatus)
697 					{
698 						/**
699 						 * If the authentication was successful then store the 
700 						 * client's credentials.
701 						 */
702 						 this.username = username;
703 						 this.password = password;
704 					}
705 					else
706 					{
707 						/**
708 						 * If the authentication was unsuccessful then send a
709 						 * message to the client stating so and close the connection.
710 						 */
711 						debugPrint("Authenticating the user failed, sending error and closing connection.");
712 
713 						 /* TODO : Send error message to client */
714 
715 						 /* Close the connection */
716 						 clientConnection.close();
717 					}
718 				}
719 
720 
721 				/* Set the connection type to `scopeField` */
722 				connectionType = scopeField;
723 			}
724 			else
725 			{
726 				/* TODO: Implement worker here */
727 			}
728 
729 
730 			/* Get the payload block */
731 			JSONValue payloadBlock = jsonMessage["payload"];
732 			debugPrint("<<< Payload is >>>\n\n" ~ payloadBlock.toPrettyString());
733 
734 
735 			/* Dispatch the message */
736 			bool dispatchStatus = dispatchMessage(connectionType, payloadBlock);
737 								
738 			if(dispatchStatus)
739 			{
740 				debugPrint("Dispatch succeeded");
741 			}
742 			else
743 			{
744 				/* TODO: Error handling */
745 				debugPrint("Dispatching failed...");
746 			}	
747 		}
748 		/* If thr attempt to convert the message to JSON fails */
749 		catch(JSONException exception)
750 		{
751 			debugPrint("<<< There was an error whilst parsing the JSON message >>>\n\n"~exception.toString());
752 		}
753 
754 		/* TODO: Return value */
755 
756 	}
757 
758 	
759 }