1 module dash.editor.websockets;
2 import dash.editor.editor;
3 import dash.utility.output, dash.utility.config;
4 
5 import vibe.core.core;
6 import vibe.http.websockets, vibe.http.server, vibe.http.router;
7 import vibe.data.json;
8 import core.time;
9 
10 struct WebSocketServer
11 {
12 public:
13     void start( Editor editor )
14     {
15         this.editor = editor;
16 
17         auto router = new URLRouter;
18         router.get( "/" ~ config.editor.route, handleWebSockets( &handleConnection ) );
19 
20         auto settings = new HTTPServerSettings;
21         settings.port = config.editor.port;
22         settings.bindAddresses = [ "::1", "127.0.0.1" ];
23 
24         listenHTTP( settings, router );
25     }
26 
27     void update()
28     {
29         processEvents();
30 
31         // Process messages
32         string[] jsonStrings;
33 
34         if( incomingBuffers.length )
35         {
36             synchronized( incomingBuffersMutex )
37             {
38                 // Copy the jsons.
39                 foreach( buffer; incomingBuffers )
40                 {
41                     jsonStrings ~= cast(string)buffer.dup;
42                 }
43 
44                 // Clear buffers
45                 incomingBuffers.length = 0;    
46             }
47         }
48 
49         foreach( jsonStr; jsonStrings )
50         {
51             EventMessage msg;
52             try msg = jsonStr.deserializeJson!EventMessage();
53             catch( JSONException e )
54             {
55                 errorf( "Invalid json string sent: %s", jsonStr );
56                 continue;
57             }
58 
59             if( msg.key.length == 0 )
60             {
61                 warning( "Received a packet without a \"key.\"" );
62                 continue;
63             }
64             if( msg.value.type == Json.Type.null_ || msg.value.type == Json.Type.undefined )
65             {
66                 warning( "Received a packet without a \"value.\"" );
67                 continue;
68             }
69 
70             editor.queueEvent( msg );
71         }
72     }
73 
74     void send( EventMessage msg )
75     {
76         shared string jsonStr = msg.serializeToJsonString();
77 
78         synchronized( outgoingBuffersMutex )
79         {
80             outgoingBuffers ~= jsonStr;
81         }
82     }
83 
84     void stop()
85     {
86         exitEventLoop( true );
87     }
88 
89 private:
90     Editor editor;
91 }
92 
93 package:
94 /// The type-safe form the cross layer communcations should take
95 struct EventMessage
96 {
97     string key;
98     Json value;
99     string callbackId;
100 }
101 
102 private:
103 // Received messages to be processed
104 shared string[] incomingBuffers;
105 shared string[] outgoingBuffers;
106 
107 shared class Mutex { }
108 shared Mutex incomingBuffersMutex;
109 shared Mutex outgoingBuffersMutex;
110 
111 shared static this()
112 {
113     incomingBuffersMutex = new shared Mutex();
114     outgoingBuffersMutex = new shared Mutex();
115 }
116 
117 void handleConnection( scope WebSocket socket )
118 {
119     size_t outgoingMessagesSent = outgoingBuffers.length;
120 
121     while( socket.connected )
122     {
123         // If there's messages waiting
124         while( socket.waitForData( 100.msecs ) )
125         {
126             string msg = socket.receiveText();
127             synchronized( incomingBuffersMutex )
128             {
129                 incomingBuffers ~= msg[];
130             }
131         }
132 
133         // If we need to send a message
134         if( outgoingBuffers.length > outgoingMessagesSent )
135         {
136             shared string[] myOutgoing;
137             synchronized( outgoingBuffersMutex )
138             {
139                 // Copy the buffers to be thread local
140                 myOutgoing = outgoingBuffers[ outgoingMessagesSent..outgoingBuffers.length ].dup;
141                 // Update current index.
142                 outgoingMessagesSent = outgoingBuffers.length;
143             }
144 
145             // And send them.
146             foreach( buf; myOutgoing )
147             {
148                 socket.send( buf );
149             }
150         }
151     }
152 }