2011年11月1日星期二

datasnap的进阶 REST时的再说回调函数

XE2提供了一个ChatRoomDemo的例子,REST形式的。要实现这个功能,客户端js调用服务器端的代码,实现聊天者的相互沟通。
服务端的代码
function TChatRoomServerMethods.SendMessage(const Msg: String): Boolean;
var
  MesgTrimmed: String;
  Session: TDSSession;
  JSONMsg: TJSONObject;
begin
  MesgTrimmed := Trim(Msg);

  //no message to send, so just exit
  if Msg = EmptyStr then
    Exit(false);

  //get the current session
  Session := TDSSessionManager.GetThreadSession;

  //if the session is invalid, don't send a message
  if (Session = nil) or (not TChatRoomUsers.Instance.UserExists(Session.UserName)) then
    Exit(false);

  //wrap the message in a JSON object
  JSONMsg := TJSONObject.Create;
  JSONMsg.AddPair(TJSONPair.Create('notificationType', 'message'));
  JSONMsg.AddPair(TJSONPair.Create('from', Session.UserName));
  JSONMsg.AddPair(TJSONPair.Create('message', GetHTMLEscapedString(MesgTrimmed)));

  //Send the message to all logged in users
  Result := ServerContainerForm.ChatRoomServer.BroadcastMessage(CHAT_ROOM_ID, JSONMsg);
end;

function TChatRoomServerMethods.SendMessageToUser(const Msg, UserName: String): Boolean;
var
  MesgTrimmed: String;
  Session: TDSSession;
  JSONMsg: TJSONObject;
  Resp: TJSONValue;
begin
  MesgTrimmed := Trim(Msg);

  //no message to send, so just exit
  if Msg = EmptyStr then
    Exit(false);

  //no user to send message to
  if not TChatRoomUsers.Instance.UserExists(UserName) then
    Exit(false);

  //get the current session
  Session := TDSSessionManager.GetThreadSession;

  //if the session is invalid, don't send a message
  if (Session = nil) or (not TChatRoomUsers.Instance.UserExists(Session.UserName)) then
    Exit(false);

  //don't message yourself!
  if AnsiCompareText(Session.UserName, UserName) = 0 then
    Exit(false);

  //wrap the message in a JSON object
  JSONMsg := TJSONObject.Create;
  JSONMsg.AddPair(TJSONPair.Create('notificationType', 'privatemessage'));
  JSONMsg.AddPair(TJSONPair.Create('from', Session.UserName));
  JSONMsg.AddPair(TJSONPair.Create('message', GetHTMLEscapedString(MesgTrimmed)));

  //Send the message to all logged in users
  Result := ServerContainerForm.ChatRoomServer.NotifyCallback(UserName, UserName, JSONMsg, Resp);

  //we don't care about the response message from the other client, only if it was successfully sent
  FreeAndNil(Resp);
end;
这里用到了两个函数,DSServer.BroadcastMessage和DSServer.NotifyCallback。现在要关心的是,这两个函数如何将数据,推送到了客户端的。

js客户端是通过startChannel()函数里面,建立了一个ClientChannel和一个ClientCallback,然后ClientChannel.connect(ClientCallback)的里面,建立了一个 CallbackLoop,这个CallbackLoop就是通过XMLHTTPRequest搭上去的常链接了。
查看客户端的CallbackFramework.js代码,可以看到CallbackLoop的start函数
  /*
   * Starts the loop, registering the client callback on the server and the initial client callback specified
   * @param firstCallback the first callback to register, as you can't register a client with the server without specifying the first callback
   */
  this.start = function(firstCallback) {
    if (this.stopped && (!nullOrEmptyStr(this.clientChannel) || firstCallback.serverChannelNames.length > 0))
    {
      this.stopped = false;
    
      //passes empty string for the ConsumeClientChannel last parameter, since this is initiating the channel, and has no value
      //passes true after the callback to say a response from the server is expected
      this.executor.executeMethod("ConsumeClientChannel", "GET", 
                             [this.clientChannel.serverChannelName, this.clientChannel.channelId, 
                              firstCallback.callbackId, arrayToCSV(firstCallback.serverChannelNames), this.securityToken, ""],
                              this.callback, true);

      if (isReferenceAFunction(this.clientChannel.onChannelStateChange)) {
        this.clientChannel.onChannelStateChange(new ClientChannelEventItem(this.clientChannel.EVENT_CHANNEL_START,
                                                                           this.clientChannel, firstCallback));
      }
    }
  };

服务器上的代码执行Datasnap.DSPlatform单元的
function TDBXServerComponent.ConsumeClientChannel(const ChannelName, ClientManagerId,
  CallbackId, ChannelNames, SecurityToken: String; ResponseData: TJSONValue): TJSONValue;
begin
  Result := ConsumeClientChannelTimeout(ChannelName, ClientManagerId, CallbackId, ChannelNames,
                                        SecurityToken, -1, ResponseData);
end;
需要留意的是,js的executeMethod函数的this.callback参数。
    /*
   * This function executes the given method with the specified parameters and then
   * notifies the callback when a response is received.
   * @param url the url to invoke
   * @param contentParam the parameter to pass through the content of the request (or null)
   * @param requestType must be one of: GET, POST, PUT, DELETE
   * @param callback An optioanl function with three parameters, the response object, the request's status (IE: 200) and the specified 'owner'
   *                 The object will be an array, which can contain string, numeric, JSON array or JSON object types.
   * @param hasResult true if a result from the server call is expected, false to ignore any result returned.
   *                  This is an optional parameter and defaults to 'true'
   * @param accept The string value to set for the Accept header of the HTTP request, or null to set as application/json
   * @return if callback in null then this function will return the result that would have 
   *         otherwise been passed to the callback
   */
  this.executeMethodURL = function(url, contentParam, requestType, callback, hasResult, accept) {
    if (hasResult == null)
    {
      hasResult = true;
    }
    
    requestType = validateRequestType(requestType);

    var request = getXmlHttpObject();  //得到XMLHTTPRequest对象

    //async is only true if there is a callback that can be notified on completion
    var useCallback = (callback != null);
    request.open(requestType, url, useCallback);

    if (useCallback)
    {
      request.onreadystatechange = function() {  //注册回调
        if (request.readyState == 4)
        {
          //the callback will be notified the execution finished even if there is no expected result
          JSONResult = hasResult ? parseHTTPResponse(request) : null;
          callback(JSONResult, request.status, owner);  //执行回调,也就是executeMethod函数的this.callback了。
        }
      };
    }

    if(contentParam != null)
    {
      contentParam = JSON.stringify(contentParam);
    }

    request.setRequestHeader("Accept", (accept == null ? "application/json" : accept));
    request.setRequestHeader("Content-Type", "text/plain;charset=UTF-8");
    request.setRequestHeader("If-Modified-Since", "Mon, 1 Oct 1990 05:00:00 GMT");
    
    var sessId = getSessionID();
    if(sessId != null)
    {
      request.setRequestHeader("Pragma", "dssession=" + sessId);
    }
    if (this.authentication != null)
    {
      request.setRequestHeader("Authorization", "Basic " + this.authentication);
    }
    request.send(contentParam);  //发送请求

    //if a callback wasn't used then simply return the result.
    //otherwise, return nothing because this function will finish executing before
    //the server call returns, so the result text will be empty until it is passed to the callback
    if (hasResult && !useCallback)
    {
      return parseHTTPResponse(request);
    }
  };
到这里,还没找到常链接,XMLHTTPRequest对象发送了请求,取得了结果,就退出了。这常链接在哪实现的??
继续看CallbackLoop.callback的代码
  /*
   * The callback which will handle a value passed in from the server and then pass
   * back a response to the server as long as the channel is active.
   */
  this.callback = function(responseObject, requestStatus, callbackLoop) {
    if (callbackLoop != null && !callbackLoop.stopped && responseObject != null)
    {
      //resolve the true response object
      responseObject = (responseObject.result != null) ? responseObject.result : responseObject;
      responseObject = isArray(responseObject) ? responseObject[0] : responseObject;
      
      //if the session this callback was created on has sense expired then stop the callback loop,
      //preventing any calls to callbackLoop.sendResponse from executing
      var sessId = getSessionID();
      if (sessId == null)
      {
        callbackLoop.stopped = true;
      }
      
      //session expired, so notify local callbacks and then stop the loop
      if (responseObject.SessionExpired != null)
      {
        callbackLoop.stopped = true; 
        for(var i = 0; i < clientChannel.callbacks.length; i++)
        {
          clientChannel.callbacks[i].notifyCallback(responseObject);
        }
        //notify that the channel has been closed
        if (isReferenceAFunction(clientChannel.onChannelStateChange)) {
          clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_SERVER_DISCONNECT,
                                                                        clientChannel, null));
        }
      }
      //broadcast to all of the callbacks listening on the given channel
      else if (responseObject.broadcast != null) //广播
      {
        var paramArray = responseObject.broadcast;
        var paramValue = paramArray[0];
 
        //used to determine if the paramValue is (on the server) a JSONValue or a TObject
        var dataType = paramArray[1];

        var broadcastChannel = responseObject.channel == null ? clientChannel.serverChannelName : responseObject.channel;
        var doForAll = clientChannel.serverChannelName == broadcastChannel;

        for(var i = 0; i < clientChannel.callbacks.length; i++)
        {
          var currentCallback = clientChannel.callbacks[i];

          //Broadcast to the callback if the channel being broadcast to is the one specified in the ClientChannel,
          //or if it appears in the array of channels this specific callback cares about.
          if (doForAll || arrayIndexOf(currentCallback.serverChannelNames, broadcastChannel) > -1) {
            currentCallback.notifyCallback(paramValue, dataType);
          }
        }
        callbackLoop.sendResponse(true, callbackLoop);
      }
      //Invoke the specified callback
      else if (responseObject.invoke != null)  //触发特别的callback
      {
        var paramArray = responseObject.invoke;
        var callbackKey = paramArray[0];
        var paramValue = paramArray[1];
 
        //used to determine if the paramValue is (on the server) a JSONValue or a TObject
        var dataType = paramArray[2];

        var currCallback;
        for(var i = 0; i < clientChannel.callbacks.length; i++)
        {
          currCallback = clientChannel.callbacks[i];
 
          if (currCallback.callbackId == callbackKey)
          {
            callbackLoop.sendResponse(currCallback.notifyCallback(paramValue, dataType), callbackLoop);
            break;
          }
        }
      }
      //if an error has occured notify the callbacks and stop the loop
      else if (responseObject.error != null)
      {
        callbackLoop.stopped = true;
        for(var i = 0; i < clientChannel.callbacks.length; i++)
        {
          clientChannel.callbacks[i].notifyCallback(responseObject, "error");
        }
        //notify that the channel has been closed by
        if (isReferenceAFunction(clientChannel.onChannelStateChange)) {
          clientChannel.onChannelStateChange(new ClientChannelEventItem(this.clientChannel.EVENT_SERVER_DISCONNECT,
                                                                             this.clientChannel, null));
        }
      }
      //If the result key is 'close' or 'closeChannel' then no response should be sent, which means
      //the recursion of this loop will end. Otherwise, send a response to the server with
      //a value of false so the loop will continue and the server will know the invocation failed
      else if (responseObject.closeChannel == null && responseObject.close == null)
      {
        callbackLoop.sendResponse(false, callbackLoop);  //这里了,只要channel不关,就又再次送出Request,如此往复循环,也就成了callback Loop了。sendResponse同样是call DSAmin.ConsumeClientChannel用的是POST,而不是GET了。
      }
      else
      {
        callbackLoop.stopped = true;
        
        //notify each callback that it has been closed
        for(var i = 0; i < clientChannel.callbacks.length; i++)
        {
          clientChannel.callbacks[i].notifyCallback(responseObject, "closed");
        }
        
        //notify that the channel has been closed
        if (isReferenceAFunction(clientChannel.onChannelStateChange)) {
          clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_CHANNEL_STOP,
                                                                        clientChannel, null));
        }
      }
    }
    else
    {
      if (callbackLoop != null) {
        if (!callbackLoop.stopped && isReferenceAFunction(clientChannel.onChannelStateChange)) {
          //notify that the channel has been closed by the server
          clientChannel.onChannelStateChange(new ClientChannelEventItem(clientChannel.EVENT_SERVER_DISCONNECT,
                                                                        clientChannel, null));
        }
        callbackLoop.stopped = true;
      }
    }
  };

也就是说,服务器往客户端推送信息,其实是靠js客户端不断的轮询实现的,这也是没法子的事情,基于HTTP的无状态链接,也只有这么做。另外,所谓的常链接,其实不是只有一个链接,因为每次轮询,都是新起一个XMLHttpRequest发出的请求的。
在ChatRoom的例子中,上面的notifyCallback,call的是下面的代码ClientCallback的第三个回调参数
  //create the messaging callback, to handle messages from the server and from other clients
  var callback = new ClientCallback(channel, userName, function(jsonValue, dataType) {
    if (jsonValue != null)
    {
      if (dataType == "closed")
      {
        addMessage(null, "You have been disconnected from the server. Sorry!", null, true);
      }
      else if (jsonValue.notificationType != null)
      {
        var type = jsonValue.notificationType;
      
        //the list of users has changed, so update it on the page
        if (type == "user_login" || type == "user_logout")
        {
          loadUsers();
        }
        //you received a public or private message, so add it to the message area
        else if (type == "message" || type == "privatemessage")
        {
          var isPrivate = type == "privatemessage";
          var from = jsonValue.from;
          var message = jsonValue.message;
 
          addMessage(from, message, isPrivate);
        }
      }
      //your session has expired!
      else if(jsonValue.SessionExpired != null)
      {
        addMessage(null, jsonValue.SessionExpired, null, true);

        //NOTE: you don't need to call stopChannel here, because the session has expired and therefore
        //this is the last message you will receive before the tunnel closes.
      }
    }
    return true;
  });
到这里,我们就能拿到前面的TDSServer.NotifyCallback和TDSServer.BroadcastMessage里面的const Msg: TJSONValue参数了,然后做自己的事情。

2 条评论:

  1. 好不容易找到个研究DataSnap的,还是在墙外

    回复删除
  2. 探讨个问题,我对Delphi写的DataSnap服务端的压力承受能力没有信心,有没有可能用其他开放式的语言模拟一个DataSnap服务端,提供DataSnap服务端一样的数据传递与返回(例如Java,PHP),客户端用仍旧用Delphi,这样服务端还可以跨平台,也可以利用PHP,JAVA广泛的开源资源,哪怕是模拟一部分,只求调用服务端函数及模拟DataSetProvider?

    回复删除