1 module dash.net.connection; 2 import dash.net.webconnection, dash.net.packets; 3 4 import msgpack; 5 import core.time, core.sync.mutex; 6 import std.concurrency, std.traits, std.conv, std.stdio; 7 8 enum typeName( T ) = fullyQualifiedName!( Unqual!( T ) ); 9 10 template ordFuncs( T ) 11 { 12 shared void delegate( T data )[][shared Connection] ordFuncs; 13 } 14 15 shared class Connection 16 { 17 public: 18 static shared(Connection) open( string ipAddress, bool host, ConnectionType[] types... ) 19 { 20 assert( types.length > 0, "Please give more than 0 args." ); 21 22 auto conn = new shared Connection; 23 24 foreach( type; types ) 25 { 26 switch ( type ) 27 { 28 case ConnectionType.TCP: 29 conn.connections[ ConnectionType.TCP ] = new shared TCPConnection( ipAddress, host ); 30 break; 31 32 case ConnectionType.UDP: 33 conn.connections[ ConnectionType.UDP ] = new shared UDPConnection( ipAddress, host ); 34 break; 35 36 default: 37 break; 38 } 39 } 40 41 // security and handshake stuff 42 43 44 conn.startRecieve(); 45 conn._isOpen = true; 46 47 return conn; 48 } 49 50 @property bool isOpen() { return _isOpen; } 51 52 // Delegates for recieving data 53 /// List of delegates to call when recieving a packet of type T 54 ref shared(void delegate( T )[]) onReceiveData( T )() 55 { 56 // Function that handles data and calls events 57 void callData( T )( ubyte[] data ) 58 { 59 auto result = unpack!T( data ); 60 foreach( dta; onReceiveData!T ) 61 dta( result ); 62 } 63 64 if( !( this in ordFuncs!T ) ) 65 ordFuncs!T[ this ] = []; 66 67 if( typeName!T !in onReceiveFuncs ) 68 onReceiveFuncs[ typeName!T ] = &callData!T; 69 70 return ordFuncs!T[ this ]; 71 } 72 73 void login( string username, ConnectionType type = ConnectionType.TCP ) 74 { 75 auto pack = new LoginPacket; 76 pack.username = username; 77 78 Packer packer; 79 packer.pack( PacketType.Login ); 80 packer.pack( pack ); 81 82 // only for steve's chat server 83 auto buffer = packer.stream.data; 84 buffer[1] = cast(ubyte)(buffer[2] - 0xa0); 85 buffer[2] = 0; 86 87 connections[ type ].send( buffer ); 88 } 89 90 void logoff( string username, ConnectionType type = ConnectionType.TCP ) 91 { 92 auto pack = new LogoffPacket; 93 pack.username = username; 94 95 Packer packer; 96 packer.pack( PacketType.Logoff ); 97 packer.pack( pack ); 98 99 connections[ type ].send( packer.stream.data ); 100 } 101 102 void whisper( string target, string message, ConnectionType type = ConnectionType.TCP ) 103 { 104 auto pack = new WhisperPacket; 105 pack.target = target; 106 pack.message = message; 107 108 Packer packer; 109 packer.pack( PacketType.Whisper ); 110 packer.pack( pack ); 111 112 connections[ type ].send( packer.stream.data ); 113 } 114 115 void send( T... )( T args, ConnectionType type = ConnectionType.TCP ) if ( T.length > 0 ) 116 { 117 foreach( ii, arg; args ) 118 { 119 PacketType packetType; 120 121 packetType = PacketType.Data; 122 123 Packer packer; 124 packer.pack( packetType ); 125 126 switch( packetType ) 127 { 128 /*case PacketType.Handshake: 129 auto hspack = new HandshakePacket; 130 131 // Assign values 132 133 packer.pack( hspack ); 134 break;*/ 135 136 case PacketType.Data: 137 auto dpack = new DataPacket; 138 139 dpack.type = typeName!(typeof(arg)); 140 dpack.data = pack( arg ); 141 142 packer.pack( dpack ); 143 break; 144 145 default: 146 assert( false, "Packet type not defined in switch." ); 147 } 148 149 connections[ type ].send( packer.stream.data ); 150 } 151 } 152 153 void update() 154 { 155 synchronized( this ) 156 { 157 if( buffer.length ) 158 { 159 packets ~= buffer; 160 buffer = []; 161 } 162 } 163 164 // Return if not open 165 if( !isOpen ) 166 return; 167 168 foreach( buf; packets ) 169 { 170 if( !buf.length ) 171 { 172 close(); 173 throw new Exception( "Connection Closed" ); 174 } 175 176 auto unpacker = Unpacker( cast(ubyte[])buf ); 177 178 PacketType packType; 179 unpacker.unpack( packType ); 180 181 switch( packType ) 182 { 183 /*case PacketType.Handshake: 184 HandshakePacket pack; 185 unpacker.unpack( pack ); 186 break;*/ 187 188 case PacketType.Data: 189 DataPacket pack; 190 unpacker.unpack( pack ); 191 192 if( pack.type in onReceiveFuncs ) 193 onReceiveFuncs[ pack.type ]( pack.data ); 194 else 195 writeln( "No onRecieveData event for ", pack.type ); 196 197 break; 198 199 default: 200 //assert( false, "Packet type not defined in switch." ); 201 } 202 } 203 204 packets = []; 205 } 206 207 void close() 208 { 209 foreach( thread; childrenThreads ) 210 std.concurrency.send( cast(Tid)thread, "done" ); 211 212 foreach( conn; connections ) 213 conn.close(); 214 215 _isOpen = false; 216 } 217 218 private: 219 WebConnection[ ConnectionType ] connections; 220 void delegate( ubyte[] )[string] onReceiveFuncs; 221 Tid[] childrenThreads; 222 bool _isOpen; 223 ubyte[][] packets; 224 ubyte[][] buffer; 225 226 void startRecieve() 227 { 228 foreach( web; connections ) 229 { 230 childrenThreads ~= cast(shared)spawn( ( ref shared(Connection) conn, shared WebConnection webcon ) 231 { 232 shared(ubyte[]) buf = new shared(ubyte[ 1024 ]); 233 234 while( true ) 235 { 236 webcon.recv( buf ); 237 238 if( receiveTimeout( dur!"msecs"( 0 ), (string x) { } ) ) 239 break; 240 241 synchronized( conn ) 242 { 243 conn.buffer ~= buf.dup; 244 } 245 } 246 }, this, web ); 247 } 248 } 249 }