服务端的代码
function TChatRoomServerMethods.SendMessage(const Msg: String): Boolean; var MesgTrimmed: String; Session: TDSSession; JSONMsg: TJSONObject; begin MesgTrimmed := Trim(Msg); //no message to send, so just exit if Msg = EmptyStr then Exit(false); //get the current session Session := TDSSessionManager.GetThreadSession; //if the session is invalid, don't send a message if (Session = nil) or (not TChatRoomUsers.Instance.UserExists(Session.UserName)) then Exit(false); //wrap the message in a JSON object JSONMsg := TJSONObject.Create; JSONMsg.AddPair(TJSONPair.Create('notificationType', 'message')); JSONMsg.AddPair(TJSONPair.Create('from', Session.UserName)); JSONMsg.AddPair(TJSONPair.Create('message', GetHTMLEscapedString(MesgTrimmed))); //Send the message to all logged in users Result := ServerContainerForm.ChatRoomServer.BroadcastMessage(CHAT_ROOM_ID, JSONMsg); end; function TChatRoomServerMethods.SendMessageToUser(const Msg, UserName: String): Boolean; var MesgTrimmed: String; Session: TDSSession; JSONMsg: TJSONObject; Resp: TJSONValue; begin MesgTrimmed := Trim(Msg); //no message to send, so just exit if Msg = EmptyStr then Exit(false); //no user to send message to if not TChatRoomUsers.Instance.UserExists(UserName) then Exit(false); //get the current session Session := TDSSessionManager.GetThreadSession; //if the session is invalid, don't send a message if (Session = nil) or (not TChatRoomUsers.Instance.UserExists(Session.UserName)) then Exit(false); //don't message yourself! if AnsiCompareText(Session.UserName, UserName) = 0 then Exit(false); //wrap the message in a JSON object JSONMsg := TJSONObject.Create; JSONMsg.AddPair(TJSONPair.Create('notificationType', 'privatemessage')); JSONMsg.AddPair(TJSONPair.Create('from', Session.UserName)); JSONMsg.AddPair(TJSONPair.Create('message', GetHTMLEscapedString(MesgTrimmed))); //Send the message to all logged in users Result := ServerContainerForm.ChatRoomServer.NotifyCallback(UserName, UserName, JSONMsg, Resp); //we don't care about the response message from the other client, only if it was successfully sent FreeAndNil(Resp); end;这里用到了两个函数,DSServer.BroadcastMessage和DSServer.NotifyCallback。现在要关心的是,这两个函数如何将数据,推送到了客户端的。
js客户端是通过startChannel()函数里面,建立了一个ClientChannel和一个ClientCallback,然后ClientChannel.connect(ClientCallback)的里面,建立了一个 CallbackLoop,这个CallbackLoop就是通过XMLHTTPRequest搭上去的常链接了。
查看客户端的CallbackFramework.js代码,可以看到CallbackLoop的start函数
/* * Starts the loop, registering the client callback on the server and the initial client callback specified * @param firstCallback the first callback to register, as you can't register a client with the server without specifying the first callback */ this.start = function(firstCallback) { if (this.stopped && (!nullOrEmptyStr(this.clientChannel) || firstCallback.serverChannelNames.length > 0)) { this.stopped = false; //passes empty string for the ConsumeClientChannel last parameter, since this is initiating the channel, and has no value //passes true after the callback to say a response from the server is expected this.executor.executeMethod("ConsumeClientChannel", "GET", [this.clientChannel.serverChannelName, this.clientChannel.channelId, firstCallback.callbackId, arrayToCSV(firstCallback.serverChannelNames), this.securityToken, ""], this.callback, true); if (isReferenceAFunction(this.clientChannel.onChannelStateChange)) { this.clientChannel.onChannelStateChange(new ClientChannelEventItem(this.clientChannel.EVENT_CHANNEL_START, this.clientChannel, firstCallback)); } } };服务器上的代码执行Datasnap.DSPlatform单元的
function TDBXServerComponent.ConsumeClientChannel(const ChannelName, ClientManagerId, CallbackId, ChannelNames, SecurityToken: String; ResponseData: TJSONValue): TJSONValue; begin Result := ConsumeClientChannelTimeout(ChannelName, ClientManagerId, CallbackId, ChannelNames, SecurityToken, -1, ResponseData); end;需要留意的是,js的executeMethod函数的this.callback参数。
/* * This function executes the given method with the specified parameters and then * notifies the callback when a response is received. * @param url the url to invoke * @param contentParam the parameter to pass through the content of the request (or null) * @param requestType must be one of: GET, POST, PUT, DELETE * @param callback An optioanl function with three parameters, the response object, the request's status (IE: 200) and the specified 'owner' * The object will be an array, which can contain string, numeric, JSON array or JSON object types. * @param hasResult true if a result from the server call is expected, false to ignore any result returned. * This is an optional parameter and defaults to 'true' * @param accept The string value to set for the Accept header of the HTTP request, or null to set as application/json * @return if callback in null then this function will return the result that would have * otherwise been passed to the callback */ this.executeMethodURL = function(url, contentParam, requestType, callback, hasResult, accept) { if (hasResult == null) { hasResult = true; } requestType = validateRequestType(requestType); var request = getXmlHttpObject(); //得到XMLHTTPRequest对象 //async is only true if there is a callback that can be notified on completion var useCallback = (callback != null); request.open(requestType, url, useCallback); if (useCallback) { request.onreadystatechange = function() { //注册回调 if (request.readyState == 4) { //the callback will be notified the execution finished even if there is no expected result JSONResult = hasResult ? parseHTTPResponse(request) : null; callback(JSONResult, request.status, owner); //执行回调,也就是executeMethod函数的this.callback了。 } }; } if(contentParam != null) { contentParam = JSON.stringify(contentParam); } request.setRequestHeader("Accept", (accept == null ? "application/json" : accept)); request.setRequestHeader("Content-Type", "text/plain;charset=UTF-8"); request.setRequestHeader("If-Modified-Since", "Mon, 1 Oct 1990 05:00:00 GMT"); var sessId = getSessionID(); if(sessId != null) { request.setRequestHeader("Pragma", "dssession=" + sessId); } if (this.authentication != null) { request.setRequestHeader("Authorization", "Basic " + this.authentication); } request.send(contentParam); //发送请求 //if a callback wasn't used then simply return the result. //otherwise, return nothing because this function will finish executing before //the server call returns, so the result text will be empty until it is passed to the callback if (hasResult && !useCallback) { return parseHTTPResponse(request); } };到这里,还没找到常链接,XMLHTTPRequest对象发送了请求,取得了结果,就退出了。这常链接在哪实现的??
继续看CallbackLoop.callback的代码
/* * The callback which will handle a value passed in from the server and then pass * back a response to the server as long as the channel is active. */ this.callback = function(responseObject, requestStatus, callbackLoop) { if (callbackLoop != null && !callbackLoop.stopped && responseObject != null) { //resolve the true response object responseObject = (responseObject.result != null) ? responseObject.result : responseObject; responseObject = isArray(responseObject) ? responseObject[0] : responseObject; //if the session this callback was created on has sense expired then stop the callback loop, //preventing any calls to callbackLoop.sendResponse from executing var sessId = getSessionID(); if (sessId == null) { callbackLoop.stopped = true; } //session expired, so notify local callbacks and then stop the loop if (responseObject.SessionExpired != null) { callbackLoop.stopped = true; for(var i = 0; i < clientChannel.callbacks.length; i++) { clientChannel.callbacks[i].notifyCallback(responseObject); } //notify that the channel has been closed if (isReferenceAFunction(clientChannel.onChannelStateChange)) { clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_SERVER_DISCONNECT, clientChannel, null)); } } //broadcast to all of the callbacks listening on the given channel else if (responseObject.broadcast != null) //广播 { var paramArray = responseObject.broadcast; var paramValue = paramArray[0]; //used to determine if the paramValue is (on the server) a JSONValue or a TObject var dataType = paramArray[1]; var broadcastChannel = responseObject.channel == null ? clientChannel.serverChannelName : responseObject.channel; var doForAll = clientChannel.serverChannelName == broadcastChannel; for(var i = 0; i < clientChannel.callbacks.length; i++) { var currentCallback = clientChannel.callbacks[i]; //Broadcast to the callback if the channel being broadcast to is the one specified in the ClientChannel, //or if it appears in the array of channels this specific callback cares about. if (doForAll || arrayIndexOf(currentCallback.serverChannelNames, broadcastChannel) > -1) { currentCallback.notifyCallback(paramValue, dataType); } } callbackLoop.sendResponse(true, callbackLoop); } //Invoke the specified callback else if (responseObject.invoke != null) //触发特别的callback { var paramArray = responseObject.invoke; var callbackKey = paramArray[0]; var paramValue = paramArray[1]; //used to determine if the paramValue is (on the server) a JSONValue or a TObject var dataType = paramArray[2]; var currCallback; for(var i = 0; i < clientChannel.callbacks.length; i++) { currCallback = clientChannel.callbacks[i]; if (currCallback.callbackId == callbackKey) { callbackLoop.sendResponse(currCallback.notifyCallback(paramValue, dataType), callbackLoop); break; } } } //if an error has occured notify the callbacks and stop the loop else if (responseObject.error != null) { callbackLoop.stopped = true; for(var i = 0; i < clientChannel.callbacks.length; i++) { clientChannel.callbacks[i].notifyCallback(responseObject, "error"); } //notify that the channel has been closed by if (isReferenceAFunction(clientChannel.onChannelStateChange)) { clientChannel.onChannelStateChange(new ClientChannelEventItem(this.clientChannel.EVENT_SERVER_DISCONNECT, this.clientChannel, null)); } } //If the result key is 'close' or 'closeChannel' then no response should be sent, which means //the recursion of this loop will end. Otherwise, send a response to the server with //a value of false so the loop will continue and the server will know the invocation failed else if (responseObject.closeChannel == null && responseObject.close == null) { callbackLoop.sendResponse(false, callbackLoop); //这里了,只要channel不关,就又再次送出Request,如此往复循环,也就成了callback Loop了。sendResponse同样是call DSAmin.ConsumeClientChannel用的是POST,而不是GET了。 } else { callbackLoop.stopped = true; //notify each callback that it has been closed for(var i = 0; i < clientChannel.callbacks.length; i++) { clientChannel.callbacks[i].notifyCallback(responseObject, "closed"); } //notify that the channel has been closed if (isReferenceAFunction(clientChannel.onChannelStateChange)) { clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_CHANNEL_STOP, clientChannel, null)); } } } else { if (callbackLoop != null) { if (!callbackLoop.stopped && isReferenceAFunction(clientChannel.onChannelStateChange)) { //notify that the channel has been closed by the server clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_SERVER_DISCONNECT, clientChannel, null)); } callbackLoop.stopped = true; } } };也就是说,服务器往客户端推送信息,其实是靠js客户端不断的轮询实现的,这也是没法子的事情,基于HTTP的无状态链接,也只有这么做。另外,所谓的常链接,其实不是只有一个链接,因为每次轮询,都是新起一个XMLHttpRequest发出的请求的。
在ChatRoom的例子中,上面的notifyCallback,call的是下面的代码ClientCallback的第三个回调参数
//create the messaging callback, to handle messages from the server and from other clients var callback = new ClientCallback(channel, userName, function(jsonValue, dataType) { if (jsonValue != null) { if (dataType == "closed") { addMessage(null, "You have been disconnected from the server. Sorry!", null, true); } else if (jsonValue.notificationType != null) { var type = jsonValue.notificationType; //the list of users has changed, so update it on the page if (type == "user_login" || type == "user_logout") { loadUsers(); } //you received a public or private message, so add it to the message area else if (type == "message" || type == "privatemessage") { var isPrivate = type == "privatemessage"; var from = jsonValue.from; var message = jsonValue.message; addMessage(from, message, isPrivate); } } //your session has expired! else if(jsonValue.SessionExpired != null) { addMessage(null, jsonValue.SessionExpired, null, true); //NOTE: you don't need to call stopChannel here, because the session has expired and therefore //this is the last message you will receive before the tunnel closes. } } return true; });到这里,我们就能拿到前面的TDSServer.NotifyCallback和TDSServer.BroadcastMessage里面的const Msg: TJSONValue参数了,然后做自己的事情。
好不容易找到个研究DataSnap的,还是在墙外
回复删除探讨个问题,我对Delphi写的DataSnap服务端的压力承受能力没有信心,有没有可能用其他开放式的语言模拟一个DataSnap服务端,提供DataSnap服务端一样的数据传递与返回(例如Java,PHP),客户端用仍旧用Delphi,这样服务端还可以跨平台,也可以利用PHP,JAVA广泛的开源资源,哪怕是模拟一部分,只求调用服务端函数及模拟DataSetProvider?
回复删除