Classic DataSnap
Prior to Delphi 2009, we may use either TLocalConnection or TSocketConnection together with TConnectionBroker for in-process or out-of-process communication via IAppServer interface. There are even more DataSnap connection that supports IAppServer. Check Delphi helps for details.
New DataSnap from Delphi 2009
Previously, TSQLConnection was used in DataSnap server only. In new DataSnap, we may use TSQLConnection in DataSnap client. There is a new driver call DataSnap that allow us to connect to a DataSnap server either via TCP or HTTP protocol using REST data packet for multi-tier application. Furthermore, we may use connect to TDSSever (TDSServer.Name) via TSQLConnection.DriverName for in-process connection. This benefits us to write a scalable multi-tier DataSnap application to consume server methods. See here for more details.
In Delphi 2009/2010, a new DataSnap connection component – TDSProviderConnection was introduced. As the name implied, it supply providers from DataSnap server. This connection require a TSQLConnection instance to work with in client tier. Thus, we may use a single TSQLConnection in client tier either in-process or out-of-process. And that fulfill the design philosophy of scalable multi-tier DataSnap application.
There are lots of demo or CodeRage videos available in the web showing how to TDSProviderConnection in DataSnap client tier. However, most of the examples only showing out-of-process design. I never find one example illustrate the usage of TDSProviderConnection for in-process design while writing this topic. Hope there are more from other famous or well know Delphi fans.
At first, I thought it is easy to use TDSProviderConnection for in-process design. But I face problems while follow the rules. These problems should be related to bugs and in mature design of DataSnap framework. I will show at here how to deals with the problems
Design a DataSnap module
First, we design a simple DataSnap module for this example. This is a TDSServerModule descendant instance with 2 components: a TDataSetProvider and a TClientDataSet instance. The reason using TDSServerModule is it will manage providers define in the module.
MySeverProvider.DFM
object ServerProvider: TServerProvider      
  OldCreateOrder = False       
  OnCreate = DSServerModuleCreate       
  Height = 225       
  Width = 474       
  object DataSetProvider1: TDataSetProvider       
    DataSet = ClientDataSet1       
    Left = 88       
    Top = 56       
  end       
  object ClientDataSet1: TClientDataSet       
    Aggregates = <>       
    Params = <>       
    Left = 200       
    Top = 56       
  end       
end
MyServerProvider.PAS
type      
  TServerProvider = class(TDSServerModule)       
    DataSetProvider1: TDataSetProvider;       
    ClientDataSet1: TClientDataSet;       
    procedure DSServerModuleCreate(Sender: TObject);       
  end; 
{$R *.dfm}
procedure TServerProvider.DSServerModuleCreate(Sender: TObject);      
begin       
  ClientDataSet1.LoadFromFile('..\orders.cds');       
end;
Define a transport layer for the provider module
Since this is a in-process application, we don’t really need a physical transport layer for the provider module. What we need here is a TDSServer and a TDSServerClass instance that helps propagate the providers to ClientDataSet in later stage.
var C: TDSServer:      
    D: TDSServerClass;       
begin       
  C := TDSServer.Create(nil);       
  D := TDSServerClass.Create(nil);       
  try       
    C.Server := D;       
    C.OnGetClass := OnGetClass;       
    D.Start;       
    
  finally       
    D.Free;       
    C.Free;       
  end;       
end;
procedure TForm1.OnGetClass(DSServerClass: TDSServerClass; var      
    PersistentClass: TPersistentClass);       
begin       
  PersistentClass := TServerProvider;       
end;
Use TDSProviderConnection to consume the in-process DataSnap service
We start hook up everything in DataSnap context to get it done:
var Q: TSQLConnection;      
    D: TDSServer;       
    C: TDSServerClass;       
    P: TServerProvider;       
    N: TDSProviderConnection;       
begin       
  P := TServerProvider.Create(nil);       
  D := TDSServer.Create(nil);       
  C := TDSServerClass.Create(nil);       
  Q := TSQLConnection.Create(nil);       
  N := TDSProviderConnection.Create(nil);       
  try       
    C.Server := D;       
    C.OnGetClass := OnGetClass; 
D.Start;
    Q.DriverName := 'DSServer';      
    Q.LoginPrompt := False;       
    Q.Open; 
    N.SQLConnection := Q;      
    N.ServerClassName := 'TServerProvider';       
    N.Connected := True; 
    ClientDataSet1.RemoteServer := N;      
    ClientDataSet1.ProviderName := 'DataSetProvider1';       
    ClientDataSet1.Open; 
    ShowMessage(IntToStr(ClientDataSet1.RecordCount));      
  finally       
    N.Free;       
    Q.Free;       
    C.Free;       
    D.Free;       
    P.Free;       
  end;       
end;
If you are using Delphi version 14.0.3513.24210 or prior than that, you will find it doesn’t work, a “Invalid pointer operation” exception raise after that.
I have found all the problems faced so far and the fixed are as follow.
Troubleshoot: Invalid pointer operation
There is a bug in DSUtil.StreamToDataPacket. I have file a report in QC#78666.
Here is a fix without changing the DBX source code:
unit DSUtil.QC78666;
interface
implementation
uses SysUtils, Variants, VarUtils, ActiveX, Classes, DBXCommonResStrs, DSUtil,      
     CodeRedirect; 
type      
  THeader = class       
    const       
      Empty       = 1;       
      Variant     = 2;       
      DataPacket  = 3;       
  end; 
  PIntArray = ^TIntArray;      
  TIntArray = array[0..0] of Integer; 
  TVarFlag = (vfByRef, vfVariant);      
  TVarFlags = set of TVarFlag; 
EInterpreterError = class(Exception);
  TVariantStreamer = class      
  private       
    class function ReadArray(VType: Integer; const Data: TStream): OleVariant;       
  public       
    class function ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;       
  end; 
const      
  EasyArrayTypes = [varSmallInt, varInteger, varSingle, varDouble, varCurrency,       
                    varDate, varBoolean, varShortInt, varByte, varWord, varLongWord]; 
  VariantSize: array[0..varLongWord] of Word  = (0, 0, SizeOf(SmallInt), SizeOf(Integer),      
    SizeOf(Single), SizeOf(Double), SizeOf(Currency), SizeOf(TDateTime), 0, 0,       
    SizeOf(Integer), SizeOf(WordBool), 0, 0, 0, 0, SizeOf(ShortInt), SizeOf(Byte),       
    SizeOf(Word), SizeOf(LongWord)); 
class function TVariantStreamer.ReadArray(VType: Integer; const Data: TStream): OleVariant;      
var       
  Flags: TVarFlags;       
  LoDim, HiDim, Indices, Bounds: PIntArray;       
  DimCount, VSize, i: Integer;       
  V: OleVariant;       
  LSafeArray: PSafeArray;       
  P: Pointer;       
begin       
  VarClear(Result);       
  Data.Read(DimCount, SizeOf(DimCount));       
  VSize := DimCount * SizeOf(Integer);       
  GetMem(LoDim, VSize);       
  try       
    GetMem(HiDim, VSize);       
    try       
      Data.Read(LoDim^, VSize);       
      Data.Read(HiDim^, VSize);       
      GetMem(Bounds, VSize * 2);       
      try       
        for i := 0 to DimCount - 1 do       
        begin       
          Bounds[i * 2] := LoDim[i];       
          Bounds[i * 2 + 1] := HiDim[i];       
        end;       
        Result := VarArrayCreate(Slice(Bounds^,DimCount * 2), VType and varTypeMask);       
      finally       
        FreeMem(Bounds);       
      end;       
      if VType and varTypeMask in EasyArrayTypes then       
      begin       
        Data.Read(VSize, SizeOf(VSize));       
        P := VarArrayLock(Result);       
        try       
          Data.Read(P^, VSize);       
        finally       
          VarArrayUnlock(Result);       
        end;       
      end else       
      begin       
        LSafeArray := PSafeArray(TVarData(Result).VArray);       
        GetMem(Indices, VSize);       
        try       
          FillChar(Indices^, VSize, 0);       
          for I := 0 to DimCount - 1 do       
            Indices[I] := LoDim[I];       
          while True do       
          begin       
            V := ReadVariant(Flags, Data);       
            if VType and varTypeMask = varVariant then       
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, V))       
            else       
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, TVarData(V).VPointer^));       
            Inc(Indices[DimCount - 1]);       
            if Indices[DimCount - 1] > HiDim[DimCount - 1] then       
              for i := DimCount - 1 downto 0 do       
                if Indices[i] > HiDim[i] then       
                begin       
                  if i = 0 then Exit;       
                  Inc(Indices[i - 1]);       
                  Indices[i] := LoDim[i];       
                end;       
          end;       
        finally       
          FreeMem(Indices);       
        end;       
      end;       
    finally       
      FreeMem(HiDim);       
    end;       
  finally       
    FreeMem(LoDim);       
  end;       
end; 
class function TVariantStreamer.ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;      
var       
  I, VType: Integer;       
  W: WideString;       
  TmpFlags: TVarFlags;       
begin       
  VarClear(Result);       
  Flags := [];       
  Data.Read(VType, SizeOf(VType));       
  if VType and varByRef = varByRef then       
    Include(Flags, vfByRef);       
  if VType = varByRef then       
  begin       
    Include(Flags, vfVariant);       
    Result := ReadVariant(TmpFlags, Data);       
    Exit;       
  end;       
  if vfByRef in Flags then       
    VType := VType xor varByRef;       
  if (VType and varArray) = varArray then       
    Result := ReadArray(VType, Data) else       
  case VType and varTypeMask of       
    varEmpty: VarClear(Result);       
    varNull: Result := NULL;       
    varOleStr:       
    begin       
      Data.Read(I, SizeOf(Integer));       
      SetLength(W, I);       
      Data.Read(W[1], I * 2);       
      Result := W;       
    end;       
    varDispatch, varUnknown:       
      raise EInterpreterError.CreateResFmt(@SBadVariantType,[IntToHex(VType,4)]);       
  else       
    TVarData(Result).VType := VType;       
    Data.Read(TVarData(Result).VPointer, VariantSize[VType and varTypeMask]);       
  end;       
end; 
procedure StreamToDataPacket(const Stream: TStream; out VarBytes: OleVariant);      
var       
  P: Pointer;       
  ByteCount: Integer;       
  Size: Int64;       
begin       
  Stream.Read(Size, 8);       
  ByteCount := Integer(Size);       
  if ByteCount > 0 then       
  begin       
    VarBytes := VarArrayCreate([0, ByteCount-1], varByte);       
    P := VarArrayLock(VarBytes);       
    try       
//      Stream.Position := 0;   // QC#78666 "Mismatched in datapacket" with DSUtil.StreamToDataPacket       
      Stream.Read(P^, ByteCount);       
      Stream.Position := 0;       
    finally       
      VarArrayUnlock(VarBytes);       
    end;       
  end       
  else       
    VarBytes := Null;       
end; 
procedure StreamToVariantPatch(const Stream: TStream; out VariantValue: OleVariant);      
var       
  Flags: TVarFlags;       
  Header: Byte;       
begin       
  if Assigned(Stream) then       
  begin       
    Stream.Position := 0;       
    Stream.Read(Header, 1);       
    if Header = THeader.Variant then       
      VariantValue := TVariantStreamer.ReadVariant(Flags, Stream)       
    else if Header = THeader.DataPacket then       
      StreamToDataPacket(Stream, VariantValue)       
    else       
      Assert(false);       
  end;       
end; 
var QC78666: TCodeRedirect;
initialization      
  QC78666 := TCodeRedirect.Create(@StreamToVariant, @StreamToVariantPatch);       
finalization       
  QC78666.Free;       
end.
Troubleshoot: I still encounter “Invalid Pointer Operation” after apply DSUtil.StreamToDataPacket patch
I have filed this problem in QC#78752. An in-process DataSnap create an instance of TDSServerCommand. A method of TDSServerCommand create TDBXNoOpRow instance:
function TDSServerCommand.CreateParameterRow: TDBXRow;     
begin      
  Result := TDBXNoOpRow.Create(FDbxContext);      
end;
Most of the methods in TDBXNoOpRow is not implemented.  There are 2 methods in class TDBXNoOpRow, GetStream and SetStream are used in subsequence operations.  This is the reason that cause the exception.   
After fix TDBXNoOpRow problem, the data packet will transport to ClientDataSet successfully.
The fix is as follow:
unit DBXCommonServer.QC78752;
interface
uses SysUtils, Classes, DBXCommon, DSCommonServer, DBXCommonTable;
type     
  TDSServerCommand_Patch = class(TDSServerCommand)      
  protected      
    function CreateParameterRowPatch: TDBXRow;      
  end; 
  TDBXNoOpRowPatch = class(TDBXNoOpRow)     
  private      
    function GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes): Integer;      
  protected      
    procedure GetStream(DbxValue: TDBXStreamValue; var Stream: TStream; var IsNull:      
        LongBool); override;      
    procedure SetStream(DbxValue: TDBXStreamValue; StreamReader: TDBXStreamReader);      
        override;      
    function UseExtendedTypes: Boolean; override;      
  end; 
  TDBXStreamValueAccess = class(TDBXByteArrayValue)     
  private      
    FStreamStreamReader: TDBXLookAheadStreamReader;      
  end; 
implementation
uses CodeRedirect;
function TDSServerCommand_Patch.CreateParameterRowPatch: TDBXRow;     
begin      
  Result := TDBXNoOpRowPatch.Create(FDbxContext);      
end; 
procedure TDBXNoOpRowPatch.GetStream(DbxValue: TDBXStreamValue; var Stream: TStream;     
    var IsNull: LongBool);      
var iSize: integer;      
    B: TBytes;      
begin      
  iSize := GetBytesFromStreamReader(TDBXStreamValueAccess(DbxValue).FStreamStreamReader, B);      
  IsNull := iSize = 0;      
  if not IsNull then begin      
    Stream := TMemoryStream.Create;      
    Stream.Write(B[0], iSize);      
  end;      
end; 
procedure TDBXNoOpRowPatch.SetStream(DbxValue: TDBXStreamValue; StreamReader:     
    TDBXStreamReader);      
var B: TBytes;      
    iSize: integer;      
begin      
  iSize := GetBytesFromStreamReader(StreamReader, B);      
  Dbxvalue.SetDynamicBytes(0, B, 0, iSize);      
end; 
function TDBXNoOpRowPatch.GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes):     
    Integer;      
const BufSize = 50 * 1024;      
var iPos: integer;      
    iRead: integer;      
begin      
  Result := 0;      
  while not R.Eos do begin      
    SetLength(Buf, Result + BufSize);      
    iPos := Result;      
    iRead := R.Read(Buf, iPos, BufSize);      
    Inc(Result, iRead);      
  end;      
  SetLength(Buf, Result);      
end; 
function TDBXNoOpRowPatch.UseExtendedTypes: Boolean;     
begin      
  Result := True;      
end; 
var QC78752: TCodeRedirect;
initialization     
  QC78752 := TCodeRedirect.Create(@TDSServerCommand_Patch.CreateParameterRow, @TDSServerCommand_Patch.CreateParameterRowPatch);      
finalization      
  QC78752.Free;      
end.
Troubleshoot: Both patches applied and work for the example but I still encounter “Invalid Pointer Operation”
This problem also filed in QC#78752. The problem is due to the following 2 methods:
- procedure TDBXStreamValue.SetValue
 - function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
 
TDBXLookAheadStreamReader.ConvertToMemoryStream return a managed FStream object to TDBXStreamValue.SetValue. This stream object become another managed object of TDBXStreamValue. It turns out that a Stream object managed by two objects and the exception raised when these 2 objects attempt to free the Stream object:
procedure TDBXStreamValue.SetValue(const Value: TDBXValue);     
begin      
  if Value.IsNull then      
    SetNull      
  else      
  begin      
    SetStream(Value.GetStream(False), True);      
  end;      
end;      
function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;      
...      
begin      
  if FStream = nil then      
    Result := nil      
  else      
  begin      
    Count := Size;      
    if not (FStream is TMemoryStream) then      
    begin      
      ...      
      StreamTemp := FStream;      
      FStream := Stream;      
      FreeAndNil(StreamTemp);      
    end;      
    FStream.Seek(0, soFromBeginning);      
    FHasLookAheadByte := false;      
    Result := FStream;      
  end;      
end;
The fix is as follow:
unit DBXCommon.QC78752;
interface
implementation
uses SysUtils, Classes, DBXCommon, CodeRedirect;
type     
  TDBXLookAheadStreamReaderAccess = class(TDBXStreamReader)      
  private      
    FStream: TStream;      
    FEOS:               Boolean;      
    FHasLookAheadByte:  Boolean;      
    FLookAheadByte:     Byte;      
  end; 
  TDBXLookAheadStreamReaderHelper = class helper for TDBXLookAheadStreamReader     
  private      
    function Accessor: TDBXLookAheadStreamReaderAccess;      
  public      
    function ConvertToMemoryStreamPatch: TStream;      
  end; 
function TDBXLookAheadStreamReaderHelper.Accessor:     
    TDBXLookAheadStreamReaderAccess;      
begin      
  Result := TDBXLookAheadStreamReaderAccess(Self);      
end; 
function TDBXLookAheadStreamReaderHelper.ConvertToMemoryStreamPatch: TStream;     
var      
  Stream: TMemoryStream;      
  StreamTemp: TStream;      
  Count: Integer;      
  Buffer: TBytes;      
  ReadBytes: Integer;      
begin      
  if Accessor.FStream = nil then      
    Result := nil      
  else      
  begin      
    Count := Size;      
    if not (Accessor.FStream is TMemoryStream) then      
    begin      
      Stream := TMemoryStream.Create;      
      if Count >= 0 then      
        Stream.SetSize(Count);      
      if Accessor.FHasLookAheadByte then      
        Stream.Write(Accessor.FLookAheadByte, 1);      
      SetLength(Buffer, 256);      
      while true do      
      begin      
        ReadBytes := Accessor.FStream.Read(Buffer, Length(Buffer));      
        if ReadBytes > 0 then      
          Stream.Write(Buffer, ReadBytes)      
        else      
          Break;      
      end;      
      StreamTemp := Accessor.FStream;      
      Accessor.FStream := Stream;      
      FreeAndNil(StreamTemp);      
      Result := Accessor.FStream;      
    end else begin      
      Stream := TMemoryStream.Create;      
      Accessor.FStream.Seek(0, soFromBeginning);      
      Stream.CopyFrom(Accessor.FStream, Accessor.FStream.Size);      
    end;      
    Stream.Seek(0, soFromBeginning);      
    Accessor.FHasLookAheadByte := false; 
    Result := Stream;     
//    Stream := TMemoryStream.Create;      
//    Stream.LoadFromStream(FStream);      
//    FStream.Seek(0, soFromBeginning);      
//    Result := Stream;      
  end;      
end; 
var QC78752: TCodeRedirect;
initialization     
  QC78752 := TCodeRedirect.Create(@TDBXLookAheadStreamReader.ConvertToMemoryStream, @TDBXLookAheadStreamReader.ConvertToMemoryStreamPatch);      
finalization      
  QC78752.Free;      
end.
Troubleshoot: I encounter memory leaks after close the application
There is a memory leaks in TDSServerConnection for in-process connection. I have filed a report in QC#78696.
Here is the fix:
unit DSServer.QC78696;
interface
implementation
uses SysUtils,      
     DBXCommon, DSServer, DSCommonServer, DBXMessageHandlerCommon, DBXSqlScanner,       
     DBXTransport,       
     CodeRedirect; 
type      
  TDSServerConnectionHandlerAccess = class(TDBXConnectionHandler)       
    FConProperties: TDBXProperties;       
    FConHandle: Integer;       
    FServer: TDSCustomServer;       
    FDatabaseConnectionHandler: TObject;       
    FHasServerConnection: Boolean;       
    FInstanceProvider: TDSHashtableInstanceProvider;       
    FCommandHandlers: TDBXCommandHandlerArray;       
    FLastCommandHandler: Integer;       
    FNextHandler: TDBXConnectionHandler;       
    FErrorMessage: TDBXErrorMessage;       
    FScanner: TDBXSqlScanner;       
    FDbxConnection: TDBXConnection;       
    FTransport: TDSServerTransport;       
    FChannel: TDbxChannel;       
    FCreateInstanceEventObject: TDSCreateInstanceEventObject;       
    FDestroyInstanceEventObject: TDSDestroyInstanceEventObject;       
    FPrepareEventObject: TDSPrepareEventObject;       
    FConnectEventObject: TDSConnectEventObject;       
    FErrorEventObject: TDSErrorEventObject;       
    FServerCon: TDSServerConnection;       
  end; 
  TDSServerConnectionPatch = class(TDSServerConnection)      
  public       
    destructor Destroy; override;       
  end; 
  TDSServerDriverPatch = class(TDSServerDriver)      
  protected       
    function CreateConnectionPatch(ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;       
  end; 
destructor TDSServerConnectionPatch.Destroy;      
begin       
  inherited Destroy;       
  TDSServerConnectionHandlerAccess(ServerConnectionHandler).FServerCon := nil;       
  ServerConnectionHandler.Free;       
end; 
function TDSServerDriverPatch.CreateConnectionPatch(      
  ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;       
begin       
  Result := TDSServerConnectionPatch.Create(ConnectionBuilder);       
end; 
var QC78696: TCodeRedirect;
initialization      
  QC78696 := TCodeRedirect.Create(@TDSServerDriverPatch.CreateConnection, @TDSServerDriverPatch.CreateConnectionPatch);       
finalization       
  QC78696.Free;       
end.
Congratulations - Excelent solucions and explanation about those bugs.
ReplyDelete