客户端调用RegisterCallback,其实就是开始了一个线程TDSChannelThread,该线程起一个dbxconnection,连接到服务器上,执行DSAdmin.ConnectClientChannel,服务器上的这个DSAdmin.ConnectClientChannel很神奇,所有的数据传输都在这里了,这个连接不会关闭,以做到服务器往客户端push数据。TDSClientCallbackChannelManager只能做到服务器向客户端推送数据,单向的。客户端要向服务器送数据,走TDSAdminClient的NotifyCallback方法,也就是只有经过SQLConnection。
DSAdmin(在Datasnap.DSCommonServer单元)是TDBXServerComponent(在Datasnap.DSPlatform单元)的子类,ConnectClientChannel函数直接call ConsumeAllClientChannel。
代码摘抄
function TDBXServerComponent.ConsumeAllClientChannel(const ChannelName,
ChannelId, CallbackId, SecurityToken: String; ChannelNames: TStringList;
ChannelCallback: TDBXCallback; Timeout: Cardinal): Boolean;
...
begin
// wait for exit message
repeat //这里开始
Data := nil;
IsBroadcast := false;
ArgType := TDBXCallback.ArgJson;
QueueMessage := nil;
TMonitor.Enter(CallbackTunnel.Queue);
try
{Wait for a queue item to be added if the queue is empty, otherwise
don't wait and just pop the next queue item}
if CallbackTunnel.Queue.QueueSize = 0 then
IsAquired := TMonitor.Wait(CallbackTunnel.Queue, Timeout)
else
IsAquired := true;
if IsAquired and (CallbackTunnel.Queue.QueueSize > 0) then
begin
{Get the next queued item from the tunnel}
QueueMessage := CallbackTunnel.Queue.PopItem;
Data := QueueMessage.Msg;
IsBroadcast := QueueMessage.IsBroadcast;
ArgType := QueueMessage.ArgType;
end;
finally
TMonitor.Exit(CallbackTunnel.Queue);
end;
if IsAquired and (Data <> nil) then
if IsBroadcast then
begin
try
Msg := TJSONObject.Create(TJSONPair.Create('broadcast',
TJSONArray.Create(Data).Add(ArgType)));
if (QueueMessage.ChannelName <> EmptyStr) and
(QueueMessage.ChannelName <> CallbackTunnel.ServerChannelName) then
Msg.AddPair(TJSONPair.Create('channel', QueueMessage.ChannelName));
try
ChannelCallback.Execute(Msg).Free;
except
try
// Remove the callback tunnel from the list, it will be freed at the end of this method
InternalRemoveCallbackTunnel(DSServer, CallbackTunnel);
except
end;
raise;
end;
finally
QueueMessage.InstanceOwner := false;
FreeAndNil(QueueMessage);
end;
end
else if Assigned(QueueMessage) then
begin
TMonitor.Enter(QueueMessage);
try
Msg := TJSONObject.Create( TJSONPair.Create('invoke',
TJSONArray.Create( TJSONString.Create(QueueMessage.CallbackId),
Data).Add(ArgType)));
try
QueueMessage.Response := ChannelCallback.Execute(Msg);
except
on E : Exception do
begin
QueueMessage.IsError := True;
QueueMessage.Response := TJSONObject.Create(TJSONPair.Create('error', E.Message));
if ChannelCallback.ConnectionLost then
begin
WasConnectionLost := True;
TMonitor.Pulse(QueueMessage);
try
// Remove the callback tunnel from the list, it will be freed at the end of this method
InternalRemoveCallbackTunnel(DSServer, CallbackTunnel);
except
end;
Break;
end;
end;
end;
TMonitor.Pulse(QueueMessage);
finally
TMonitor.Exit(QueueMessage);
end;
end
until (not IsAquired) or (Data = nil); //这里结束
...
end.
客户端调用UnregisterCallback,调用的线程(一般就是主线程),直接起一个dbxconnection,让服务器执行DSAdmin.UnregisterClientCallback,执行后立刻dbxconnection.close, 服务器执行UnregisterClientCallback只是剔除消息筛选;如果客户端没有订阅别的Callback,就再次起一个dbxconnection,让服务器执行DSAdmin.CloseClientChannel,服务器的CloseClientChannel里,会往CallbackTunnel广播一个nil的消息,这就让ConnectClientChannel的repeat循环也会退出(data=nil),从而关闭最开始连接。
另外, TDSClientCallbackChannelManager在界面上找不到输入认证信息的地方,比如DSAuthPassword, DSAuUser等,其实TDSClientCallbackChannelManager也好SQLConnection也好,他们都是个载体罢了,真正在后面起作用的,是TDBXProperties。监视一下DSServer的OnConnect里的DSConnectEventObject.ConnectProperties,我们就能知道TDSClientCallbackChannelManager的username, password,其实是SQLConnection的DSAuthPassword,DSAuUser。
在TDBXProperties里的键值对为
DSAuthenticationUser=
DSAuthenticationPassword=
最后,TDSClientCallbackChannelManager并没有Filters的属性,这个其实现在的客户端,就算使用SQLConnection时也不必设置Filters了,我们只要别忘记在客户端也TTransportFilterFactory.RegisterFilter一下要用的Filter即可。
没有评论:
发表评论