1 module dash.editor.websockets; 2 import dash.editor.editor; 3 import dash.utility.output, dash.utility.config; 4 5 import vibe.core.core; 6 import vibe.http.websockets, vibe.http.server, vibe.http.router; 7 import vibe.data.json; 8 import core.time; 9 10 struct WebSocketServer 11 { 12 public: 13 void start( Editor editor ) 14 { 15 this.editor = editor; 16 17 auto router = new URLRouter; 18 router.get( "/" ~ config.editor.route, handleWebSockets( &handleConnection ) ); 19 20 auto settings = new HTTPServerSettings; 21 settings.port = config.editor.port; 22 settings.bindAddresses = [ "::1", "127.0.0.1" ]; 23 24 listenHTTP( settings, router ); 25 } 26 27 void update() 28 { 29 processEvents(); 30 31 // Process messages 32 string[] jsonStrings; 33 34 if( incomingBuffers.length ) 35 { 36 synchronized( incomingBuffersMutex ) 37 { 38 // Copy the jsons. 39 foreach( buffer; incomingBuffers ) 40 { 41 jsonStrings ~= cast(string)buffer.dup; 42 } 43 44 // Clear buffers 45 incomingBuffers.length = 0; 46 } 47 } 48 49 foreach( jsonStr; jsonStrings ) 50 { 51 EventMessage msg; 52 try msg = jsonStr.deserializeJson!EventMessage(); 53 catch( JSONException e ) 54 { 55 errorf( "Invalid json string sent: %s", jsonStr ); 56 continue; 57 } 58 59 if( msg.key.length == 0 ) 60 { 61 warning( "Received a packet without a \"key.\"" ); 62 continue; 63 } 64 if( msg.value.type == Json.Type.null_ || msg.value.type == Json.Type.undefined ) 65 { 66 warning( "Received a packet without a \"value.\"" ); 67 continue; 68 } 69 70 editor.queueEvent( msg ); 71 } 72 } 73 74 void send( EventMessage msg ) 75 { 76 shared string jsonStr = msg.serializeToJsonString(); 77 78 synchronized( outgoingBuffersMutex ) 79 { 80 outgoingBuffers ~= jsonStr; 81 } 82 } 83 84 void stop() 85 { 86 exitEventLoop( true ); 87 } 88 89 private: 90 Editor editor; 91 } 92 93 package: 94 /// The type-safe form the cross layer communcations should take 95 struct EventMessage 96 { 97 string key; 98 Json value; 99 string callbackId; 100 } 101 102 private: 103 // Received messages to be processed 104 shared string[] incomingBuffers; 105 shared string[] outgoingBuffers; 106 107 shared class Mutex { } 108 shared Mutex incomingBuffersMutex; 109 shared Mutex outgoingBuffersMutex; 110 111 shared static this() 112 { 113 incomingBuffersMutex = new shared Mutex(); 114 outgoingBuffersMutex = new shared Mutex(); 115 } 116 117 void handleConnection( scope WebSocket socket ) 118 { 119 size_t outgoingMessagesSent = outgoingBuffers.length; 120 121 while( socket.connected ) 122 { 123 // If there's messages waiting 124 while( socket.waitForData( 100.msecs ) ) 125 { 126 string msg = socket.receiveText(); 127 synchronized( incomingBuffersMutex ) 128 { 129 incomingBuffers ~= msg[]; 130 } 131 } 132 133 // If we need to send a message 134 if( outgoingBuffers.length > outgoingMessagesSent ) 135 { 136 shared string[] myOutgoing; 137 synchronized( outgoingBuffersMutex ) 138 { 139 // Copy the buffers to be thread local 140 myOutgoing = outgoingBuffers[ outgoingMessagesSent..outgoingBuffers.length ].dup; 141 // Update current index. 142 outgoingMessagesSent = outgoingBuffers.length; 143 } 144 145 // And send them. 146 foreach( buf; myOutgoing ) 147 { 148 socket.send( buf ); 149 } 150 } 151 } 152 }