2011年10月18日星期二

datasnap的初步 TDSClientCallbackChannelManager的工作方式

理解一下TDSClientCallbackChannelManager的工作方式吧

客户端调用RegisterCallback,其实就是开始了一个线程TDSChannelThread,该线程起一个dbxconnection,连接到服务器上,执行DSAdmin.ConnectClientChannel,服务器上的这个DSAdmin.ConnectClientChannel很神奇,所有的数据传输都在这里了,这个连接不会关闭,以做到服务器往客户端push数据。TDSClientCallbackChannelManager只能做到服务器向客户端推送数据,单向的。客户端要向服务器送数据,走TDSAdminClient的NotifyCallback方法,也就是只有经过SQLConnection。

DSAdmin(在Datasnap.DSCommonServer单元)是TDBXServerComponent(在Datasnap.DSPlatform单元)的子类,ConnectClientChannel函数直接call ConsumeAllClientChannel。
代码摘抄
function TDBXServerComponent.ConsumeAllClientChannel(const ChannelName,
  ChannelId, CallbackId, SecurityToken: String; ChannelNames: TStringList;
  ChannelCallback: TDBXCallback; Timeout: Cardinal): Boolean;
... 
begin

        // wait for exit message
        repeat  //这里开始
          Data := nil;
          IsBroadcast := false;
          ArgType := TDBXCallback.ArgJson;

          QueueMessage := nil;

          TMonitor.Enter(CallbackTunnel.Queue);
          try
            {Wait for a queue item to be added if the queue is empty, otherwise
             don't wait and just pop the next queue item}
            if CallbackTunnel.Queue.QueueSize = 0 then
              IsAquired := TMonitor.Wait(CallbackTunnel.Queue, Timeout)
            else
              IsAquired := true;

            if IsAquired and (CallbackTunnel.Queue.QueueSize > 0) then
            begin
              {Get the next queued item from the tunnel}
              QueueMessage := CallbackTunnel.Queue.PopItem;
              Data := QueueMessage.Msg;
              IsBroadcast := QueueMessage.IsBroadcast;
              ArgType := QueueMessage.ArgType;
            end;
          finally
            TMonitor.Exit(CallbackTunnel.Queue);
          end;

          if IsAquired and (Data <> nil) then
            if IsBroadcast then
            begin
              try
                Msg := TJSONObject.Create(TJSONPair.Create('broadcast',
                                                          TJSONArray.Create(Data).Add(ArgType)));
                if (QueueMessage.ChannelName <> EmptyStr) and
                   (QueueMessage.ChannelName <> CallbackTunnel.ServerChannelName) then
                  Msg.AddPair(TJSONPair.Create('channel', QueueMessage.ChannelName));

                try
                  ChannelCallback.Execute(Msg).Free;
                except
                  try
                    // Remove the callback tunnel from the list, it will be freed at the end of this method
                    InternalRemoveCallbackTunnel(DSServer, CallbackTunnel);
                  except
                  end;
                  raise;
                end;
              finally
                QueueMessage.InstanceOwner := false;
                FreeAndNil(QueueMessage);
              end;
            end
            else if Assigned(QueueMessage) then
            begin
              TMonitor.Enter(QueueMessage);
              try
                Msg := TJSONObject.Create( TJSONPair.Create('invoke',
                       TJSONArray.Create( TJSONString.Create(QueueMessage.CallbackId),
                                          Data).Add(ArgType)));
                try
                  QueueMessage.Response :=  ChannelCallback.Execute(Msg);
                except
                  on E : Exception do
                  begin
                    QueueMessage.IsError := True;
                    QueueMessage.Response := TJSONObject.Create(TJSONPair.Create('error', E.Message));
                    if ChannelCallback.ConnectionLost then
                    begin
                      WasConnectionLost := True;
                      TMonitor.Pulse(QueueMessage);
                      try
                        // Remove the callback tunnel from the list, it will be freed at the end of this method
                        InternalRemoveCallbackTunnel(DSServer, CallbackTunnel);
                      except
                      end;
                      Break;
                    end;
                  end;
                end;
                TMonitor.Pulse(QueueMessage);
              finally
                TMonitor.Exit(QueueMessage);
              end;
            end
        until (not IsAquired) or (Data = nil); //这里结束
...
end.

客户端调用UnregisterCallback,调用的线程(一般就是主线程),直接起一个dbxconnection,让服务器执行DSAdmin.UnregisterClientCallback,执行后立刻dbxconnection.close, 服务器执行UnregisterClientCallback只是剔除消息筛选;

如果客户端没有订阅别的Callback,就再次起一个dbxconnection,让服务器执行DSAdmin.CloseClientChannel,服务器的CloseClientChannel里,会往CallbackTunnel广播一个nil的消息,这就让ConnectClientChannel的repeat循环也会退出(data=nil),从而关闭最开始连接。


另外, TDSClientCallbackChannelManager在界面上找不到输入认证信息的地方,比如DSAuthPassword, DSAuUser等,其实TDSClientCallbackChannelManager也好SQLConnection也好,他们都是个载体罢了,真正在后面起作用的,是TDBXProperties。监视一下DSServer的OnConnect里的DSConnectEventObject.ConnectProperties,我们就能知道TDSClientCallbackChannelManager的username, password,其实是SQLConnection的DSAuthPassword,DSAuUser。
在TDBXProperties里的键值对为
DSAuthenticationUser=
DSAuthenticationPassword=

最后,TDSClientCallbackChannelManager并没有Filters的属性,这个其实现在的客户端,就算使用SQLConnection时也不必设置Filters了,我们只要别忘记在客户端也TTransportFilterFactory.RegisterFilter一下要用的Filter即可。


没有评论:

发表评论