1 module dash.net.connection;
2 import dash.net.webconnection, dash.net.packets;
3 
4 import msgpack;
5 import core.time, core.sync.mutex;
6 import std.concurrency, std.traits, std.conv, std.stdio;
7 
8 enum typeName( T ) = fullyQualifiedName!( Unqual!( T ) );
9 
10 template ordFuncs( T )
11 {
12 	shared void delegate( T data )[][shared Connection] ordFuncs;
13 }
14 
15 shared class Connection
16 {
17 public:
18 	static shared(Connection) open( string ipAddress, bool host, ConnectionType[] types... )
19 	{
20 		assert( types.length > 0, "Please give more than 0 args." );
21 
22 		auto conn = new shared Connection;
23 
24 		foreach( type; types )
25 		{
26 			switch ( type )
27 			{
28 				case ConnectionType.TCP:
29 					conn.connections[ ConnectionType.TCP ] = new shared TCPConnection( ipAddress, host );
30 					break;
31 
32 				case ConnectionType.UDP:
33 					conn.connections[ ConnectionType.UDP ] = new shared UDPConnection( ipAddress, host );
34 					break;
35 
36 				default:
37 					break;
38 			}
39 		}
40 
41 		// security and handshake stuff
42 
43 
44 		conn.startRecieve();
45 		conn._isOpen = true;
46 
47 		return conn;
48 	}
49 
50 	@property bool isOpen() { return _isOpen; }
51 
52 	// Delegates for recieving data
53 	/// List of delegates to call when recieving a packet of type T
54 	ref shared(void delegate( T )[]) onReceiveData( T )()
55 	{
56 		// Function that handles data and calls events
57 		void callData( T )( ubyte[] data )
58 		{
59 			auto result = unpack!T( data );
60 			foreach( dta; onReceiveData!T )
61 				dta( result );
62 		}
63 
64 		if( !( this in ordFuncs!T ) )
65 			ordFuncs!T[ this ] = [];
66 
67 		if( typeName!T !in onReceiveFuncs )
68 			onReceiveFuncs[ typeName!T ] = &callData!T;
69 
70 		return ordFuncs!T[ this ];
71 	}
72 
73 	void login( string username, ConnectionType type = ConnectionType.TCP )
74 	{
75 		auto pack = new LoginPacket;
76 		pack.username = username;
77 
78 		Packer packer;
79 		packer.pack( PacketType.Login );
80 		packer.pack( pack );
81 
82 		// only for steve's chat server
83 		auto buffer = packer.stream.data;
84 		buffer[1] = cast(ubyte)(buffer[2] - 0xa0);
85 		buffer[2] = 0;
86 
87 		connections[ type ].send( buffer );
88 	}
89 
90 	void logoff( string username, ConnectionType type = ConnectionType.TCP )
91 	{
92 		auto pack = new LogoffPacket;
93 		pack.username = username;
94 
95 		Packer packer;
96 		packer.pack( PacketType.Logoff );
97 		packer.pack( pack );
98 
99 		connections[ type ].send( packer.stream.data );
100 	}
101 
102 	void whisper( string target, string message, ConnectionType type = ConnectionType.TCP )
103 	{
104 		auto pack = new WhisperPacket;
105 		pack.target = target;
106 		pack.message = message;
107 
108 		Packer packer;
109 		packer.pack( PacketType.Whisper );
110 		packer.pack( pack );
111 
112 		connections[ type ].send( packer.stream.data );
113 	}
114 
115 	void send( T... )( T args, ConnectionType type = ConnectionType.TCP ) if ( T.length > 0 )
116 	{
117 		foreach( ii, arg; args )
118 		{
119 			PacketType packetType;
120 
121 			packetType = PacketType.Data;
122 
123 			Packer packer;
124 			packer.pack( packetType );
125 
126 			switch( packetType )
127 			{
128 				/*case PacketType.Handshake:
129 					auto hspack = new HandshakePacket;
130 
131 					// Assign values
132 
133 					packer.pack( hspack );
134 					break;*/
135 
136 				case PacketType.Data:
137 					auto dpack = new DataPacket;
138 
139 					dpack.type = typeName!(typeof(arg));
140 					dpack.data = pack( arg );
141 
142 					packer.pack( dpack );
143 					break;
144 
145 				default:
146 					assert( false, "Packet type not defined in switch." );
147 			}
148 
149 			connections[ type ].send( packer.stream.data );
150 		}
151 	}
152 
153 	void update()
154 	{
155 		synchronized( this )
156 		{
157 			if( buffer.length )
158 			{
159 				packets ~= buffer;
160 				buffer = [];
161 			}
162 		}
163 
164 		// Return if not open
165 		if( !isOpen )
166 			return;
167 
168 		foreach( buf; packets )
169 		{
170 			if( !buf.length )
171 			{
172 				close();
173 				throw new Exception( "Connection Closed" );
174 			}
175 
176 			auto unpacker = Unpacker( cast(ubyte[])buf );
177 
178 			PacketType packType;
179 			unpacker.unpack( packType );
180 
181 			switch( packType )
182 			{
183 				/*case PacketType.Handshake:
184 					HandshakePacket pack;
185 					unpacker.unpack( pack );
186 					break;*/
187 
188 				case PacketType.Data:
189 					DataPacket pack;
190 					unpacker.unpack( pack );
191 
192 					if( pack.type in onReceiveFuncs )
193 						onReceiveFuncs[ pack.type ]( pack.data );
194 					else
195 						writeln( "No onRecieveData event for ", pack.type );
196 
197 					break;
198 
199 				default:
200 					//assert( false, "Packet type not defined in switch." );
201 			}
202 		}
203 
204 		packets = [];
205 	}
206 
207 	void close()
208 	{
209 		foreach( thread; childrenThreads )
210 			std.concurrency.send( cast(Tid)thread, "done" );
211 
212 		foreach( conn; connections )
213 			conn.close();
214 
215 		_isOpen = false;
216 	}
217 
218 private:
219 	WebConnection[ ConnectionType ] connections;
220 	void delegate( ubyte[] )[string] onReceiveFuncs;
221 	Tid[] childrenThreads;
222 	bool _isOpen;
223 	ubyte[][] packets;
224 	ubyte[][] buffer;
225 
226 	void startRecieve()
227 	{
228 		foreach( web; connections )
229 		{
230 			childrenThreads ~= cast(shared)spawn( ( ref shared(Connection) conn, shared WebConnection webcon )
231 			{
232 				shared(ubyte[]) buf = new shared(ubyte[ 1024 ]);
233 
234 				while( true )
235 				{
236 					webcon.recv( buf );
237 
238 					if( receiveTimeout( dur!"msecs"( 0 ), (string x) { } ) )
239 						break;
240 
241 					synchronized( conn )
242 					{
243 						conn.buffer ~= buf.dup;
244 					}
245 				}
246 			}, this, web );
247 		}
248 	}
249 }