Monday, October 19, 2009

DataSnap: In-process IAppServer connection via TDSProviderConnection

Classic DataSnap

Prior to Delphi 2009, we may use either TLocalConnection or TSocketConnection together with TConnectionBroker for in-process or out-of-process communication via IAppServer interface.  There are even more DataSnap connection that supports IAppServer.  Check Delphi helps for details.

New DataSnap from Delphi 2009

Previously, TSQLConnection was used in DataSnap server only.  In new DataSnap, we may use TSQLConnection in DataSnap client.  There is a new driver call DataSnap that allow us to connect to a DataSnap server either via TCP or HTTP protocol using REST data packet for multi-tier application.  Furthermore, we may use connect to TDSSever (TDSServer.Name) via TSQLConnection.DriverName for in-process connection.  This benefits us to write a scalable multi-tier DataSnap application to consume server methods.  See here for more details.

In Delphi 2009/2010, a new DataSnap connection component – TDSProviderConnection was introduced.  As the name implied, it supply providers from DataSnap server.  This connection require a TSQLConnection instance to work with in client tier.  Thus, we may use a single TSQLConnection in client tier either in-process or out-of-process.  And that fulfill the design philosophy of  scalable multi-tier DataSnap application.

There are lots of demo or CodeRage videos available in the web showing how to TDSProviderConnection in DataSnap client tier.  However, most of the examples only showing out-of-process design.  I never find one example illustrate the usage of TDSProviderConnection for in-process design while writing this topic.  Hope there are more from other famous or well know Delphi fans.

At first, I thought it is easy to use TDSProviderConnection for in-process design.  But I face problems while follow the rules.  These problems should be related to bugs and in mature design of DataSnap framework.  I will show at here how to deals with the problems

Design a DataSnap module

First, we design a simple DataSnap module for this example.  This is a TDSServerModule descendant instance with 2 components: a TDataSetProvider and a TClientDataSet instance.  The reason using TDSServerModule is it will manage providers define in the module.

MySeverProvider.DFM

object ServerProvider: TServerProvider
  OldCreateOrder = False
  OnCreate = DSServerModuleCreate
  Height = 225
  Width = 474
  object DataSetProvider1: TDataSetProvider
    DataSet = ClientDataSet1
    Left = 88
    Top = 56
  end
  object ClientDataSet1: TClientDataSet
    Aggregates = <>
    Params = <>
    Left = 200
    Top = 56
  end
end

MyServerProvider.PAS

type
  TServerProvider = class(TDSServerModule)
    DataSetProvider1: TDataSetProvider;
    ClientDataSet1: TClientDataSet;
    procedure DSServerModuleCreate(Sender: TObject);
  end;

{$R *.dfm}

procedure TServerProvider.DSServerModuleCreate(Sender: TObject);
begin
  ClientDataSet1.LoadFromFile('..\orders.cds');
end;

Define a transport layer for the provider module

Since this is a in-process application, we don’t really need a physical transport layer for the provider module.  What we need here is a TDSServer and a TDSServerClass instance that helps propagate the providers to ClientDataSet in later stage.

var C: TDSServer:
    D: TDSServerClass;
begin
  C := TDSServer.Create(nil);
  D := TDSServerClass.Create(nil);
  try
    C.Server := D;
    C.OnGetClass := OnGetClass;
    D.Start;
   
  finally
    D.Free;
    C.Free;
  end;
end;

procedure TForm1.OnGetClass(DSServerClass: TDSServerClass; var
    PersistentClass: TPersistentClass);
begin
  PersistentClass := TServerProvider;
end;

Use TDSProviderConnection to consume the in-process DataSnap service

We start hook up everything in DataSnap context to get it done:

var Q: TSQLConnection;
    D: TDSServer;
    C: TDSServerClass;
    P: TServerProvider;
    N: TDSProviderConnection;
begin
  P := TServerProvider.Create(nil);
  D := TDSServer.Create(nil);
  C := TDSServerClass.Create(nil);
  Q := TSQLConnection.Create(nil);
  N := TDSProviderConnection.Create(nil);
  try
    C.Server := D;
    C.OnGetClass := OnGetClass;

    D.Start;

    Q.DriverName := 'DSServer';
    Q.LoginPrompt := False;
    Q.Open;

    N.SQLConnection := Q;
    N.ServerClassName := 'TServerProvider';
    N.Connected := True;

    ClientDataSet1.RemoteServer := N;
    ClientDataSet1.ProviderName := 'DataSetProvider1';
    ClientDataSet1.Open;

    ShowMessage(IntToStr(ClientDataSet1.RecordCount));
  finally
    N.Free;
    Q.Free;
    C.Free;
    D.Free;
    P.Free;
  end;
end;

If you are using Delphi version 14.0.3513.24210 or prior than that, you will find it doesn’t work, a “Invalid pointer operation” exception raise after that.

I have found all the problems faced so far and the fixed are as follow.

Troubleshoot: Invalid pointer operation

There is a bug in DSUtil.StreamToDataPacket.  I have file a report in QC#78666.

Here is a fix without changing the DBX source code:

unit DSUtil.QC78666;

interface

implementation

uses SysUtils, Variants, VarUtils, ActiveX, Classes, DBXCommonResStrs, DSUtil,
     CodeRedirect;

type
  THeader = class
    const
      Empty       = 1;
      Variant     = 2;
      DataPacket  = 3;
  end;

  PIntArray = ^TIntArray;
  TIntArray = array[0..0] of Integer;

  TVarFlag = (vfByRef, vfVariant);
  TVarFlags = set of TVarFlag;

  EInterpreterError = class(Exception);

  TVariantStreamer = class
  private
    class function ReadArray(VType: Integer; const Data: TStream): OleVariant;
  public
    class function ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
  end;

const
  EasyArrayTypes = [varSmallInt, varInteger, varSingle, varDouble, varCurrency,
                    varDate, varBoolean, varShortInt, varByte, varWord, varLongWord];

  VariantSize: array[0..varLongWord] of Word  = (0, 0, SizeOf(SmallInt), SizeOf(Integer),
    SizeOf(Single), SizeOf(Double), SizeOf(Currency), SizeOf(TDateTime), 0, 0,
    SizeOf(Integer), SizeOf(WordBool), 0, 0, 0, 0, SizeOf(ShortInt), SizeOf(Byte),
    SizeOf(Word), SizeOf(LongWord));

class function TVariantStreamer.ReadArray(VType: Integer; const Data: TStream): OleVariant;
var
  Flags: TVarFlags;
  LoDim, HiDim, Indices, Bounds: PIntArray;
  DimCount, VSize, i: Integer;
  V: OleVariant;
  LSafeArray: PSafeArray;
  P: Pointer;
begin
  VarClear(Result);
  Data.Read(DimCount, SizeOf(DimCount));
  VSize := DimCount * SizeOf(Integer);
  GetMem(LoDim, VSize);
  try
    GetMem(HiDim, VSize);
    try
      Data.Read(LoDim^, VSize);
      Data.Read(HiDim^, VSize);
      GetMem(Bounds, VSize * 2);
      try
        for i := 0 to DimCount - 1 do
        begin
          Bounds[i * 2] := LoDim[i];
          Bounds[i * 2 + 1] := HiDim[i];
        end;
        Result := VarArrayCreate(Slice(Bounds^,DimCount * 2), VType and varTypeMask);
      finally
        FreeMem(Bounds);
      end;
      if VType and varTypeMask in EasyArrayTypes then
      begin
        Data.Read(VSize, SizeOf(VSize));
        P := VarArrayLock(Result);
        try
          Data.Read(P^, VSize);
        finally
          VarArrayUnlock(Result);
        end;
      end else
      begin
        LSafeArray := PSafeArray(TVarData(Result).VArray);
        GetMem(Indices, VSize);
        try
          FillChar(Indices^, VSize, 0);
          for I := 0 to DimCount - 1 do
            Indices[I] := LoDim[I];
          while True do
          begin
            V := ReadVariant(Flags, Data);
            if VType and varTypeMask = varVariant then
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, V))
            else
              SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, TVarData(V).VPointer^));
            Inc(Indices[DimCount - 1]);
            if Indices[DimCount - 1] > HiDim[DimCount - 1] then
              for i := DimCount - 1 downto 0 do
                if Indices[i] > HiDim[i] then
                begin
                  if i = 0 then Exit;
                  Inc(Indices[i - 1]);
                  Indices[i] := LoDim[i];
                end;
          end;
        finally
          FreeMem(Indices);
        end;
      end;
    finally
      FreeMem(HiDim);
    end;
  finally
    FreeMem(LoDim);
  end;
end;

class function TVariantStreamer.ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
var
  I, VType: Integer;
  W: WideString;
  TmpFlags: TVarFlags;
begin
  VarClear(Result);
  Flags := [];
  Data.Read(VType, SizeOf(VType));
  if VType and varByRef = varByRef then
    Include(Flags, vfByRef);
  if VType = varByRef then
  begin
    Include(Flags, vfVariant);
    Result := ReadVariant(TmpFlags, Data);
    Exit;
  end;
  if vfByRef in Flags then
    VType := VType xor varByRef;
  if (VType and varArray) = varArray then
    Result := ReadArray(VType, Data) else
  case VType and varTypeMask of
    varEmpty: VarClear(Result);
    varNull: Result := NULL;
    varOleStr:
    begin
      Data.Read(I, SizeOf(Integer));
      SetLength(W, I);
      Data.Read(W[1], I * 2);
      Result := W;
    end;
    varDispatch, varUnknown:
      raise EInterpreterError.CreateResFmt(@SBadVariantType,[IntToHex(VType,4)]);
  else
    TVarData(Result).VType := VType;
    Data.Read(TVarData(Result).VPointer, VariantSize[VType and varTypeMask]);
  end;
end;

procedure StreamToDataPacket(const Stream: TStream; out VarBytes: OleVariant);
var
  P: Pointer;
  ByteCount: Integer;
  Size: Int64;
begin
  Stream.Read(Size, 8);
  ByteCount := Integer(Size);
  if ByteCount > 0 then
  begin
    VarBytes := VarArrayCreate([0, ByteCount-1], varByte);
    P := VarArrayLock(VarBytes);
    try
//      Stream.Position := 0;   // QC#78666 "Mismatched in datapacket" with DSUtil.StreamToDataPacket
      Stream.Read(P^, ByteCount);
      Stream.Position := 0;
    finally
      VarArrayUnlock(VarBytes);
    end;
  end
  else
    VarBytes := Null;
end;

procedure StreamToVariantPatch(const Stream: TStream; out VariantValue: OleVariant);
var
  Flags: TVarFlags;
  Header: Byte;
begin
  if Assigned(Stream) then
  begin
    Stream.Position := 0;
    Stream.Read(Header, 1);
    if Header = THeader.Variant then
      VariantValue := TVariantStreamer.ReadVariant(Flags, Stream)
    else if Header = THeader.DataPacket then
      StreamToDataPacket(Stream, VariantValue)
    else
      Assert(false);
  end;
end;

var QC78666: TCodeRedirect;

initialization
  QC78666 := TCodeRedirect.Create(@StreamToVariant, @StreamToVariantPatch);
finalization
  QC78666.Free;
end.

Troubleshoot: I still encounter “Invalid Pointer Operation” after apply DSUtil.StreamToDataPacket patch

I have filed this problem in QC#78752.  An in-process DataSnap create an instance of TDSServerCommand.  A method of TDSServerCommand create TDBXNoOpRow instance:

function TDSServerCommand.CreateParameterRow: TDBXRow;
begin
  Result := TDBXNoOpRow.Create(FDbxContext);
end;

Most of the methods in TDBXNoOpRow is not implemented.  There are 2 methods in class TDBXNoOpRow, GetStream and SetStream are used in subsequence operations.  This is the reason that cause the exception.
After fix TDBXNoOpRow problem, the data packet will transport to ClientDataSet successfully.

The fix is as follow:

unit DBXCommonServer.QC78752;

interface

uses SysUtils, Classes, DBXCommon, DSCommonServer, DBXCommonTable;

type
  TDSServerCommand_Patch = class(TDSServerCommand)
  protected
    function CreateParameterRowPatch: TDBXRow;
  end;

  TDBXNoOpRowPatch = class(TDBXNoOpRow)
  private
    function GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes): Integer;
  protected
    procedure GetStream(DbxValue: TDBXStreamValue; var Stream: TStream; var IsNull:
        LongBool); override;
    procedure SetStream(DbxValue: TDBXStreamValue; StreamReader: TDBXStreamReader);
        override;
    function UseExtendedTypes: Boolean; override;
  end;

  TDBXStreamValueAccess = class(TDBXByteArrayValue)
  private
    FStreamStreamReader: TDBXLookAheadStreamReader;
  end;

implementation

uses CodeRedirect;

function TDSServerCommand_Patch.CreateParameterRowPatch: TDBXRow;
begin
  Result := TDBXNoOpRowPatch.Create(FDbxContext);
end;

procedure TDBXNoOpRowPatch.GetStream(DbxValue: TDBXStreamValue; var Stream: TStream;
    var IsNull: LongBool);
var iSize: integer;
    B: TBytes;
begin
  iSize := GetBytesFromStreamReader(TDBXStreamValueAccess(DbxValue).FStreamStreamReader, B);
  IsNull := iSize = 0;
  if not IsNull then begin
    Stream := TMemoryStream.Create;
    Stream.Write(B[0], iSize);
  end;
end;

procedure TDBXNoOpRowPatch.SetStream(DbxValue: TDBXStreamValue; StreamReader:
    TDBXStreamReader);
var B: TBytes;
    iSize: integer;
begin
  iSize := GetBytesFromStreamReader(StreamReader, B);
  Dbxvalue.SetDynamicBytes(0, B, 0, iSize);
end;

function TDBXNoOpRowPatch.GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes):
    Integer;
const BufSize = 50 * 1024;
var iPos: integer;
    iRead: integer;
begin
  Result := 0;
  while not R.Eos do begin
    SetLength(Buf, Result + BufSize);
    iPos := Result;
    iRead := R.Read(Buf, iPos, BufSize);
    Inc(Result, iRead);
  end;
  SetLength(Buf, Result);
end;

function TDBXNoOpRowPatch.UseExtendedTypes: Boolean;
begin
  Result := True;
end;

var QC78752: TCodeRedirect;

initialization
  QC78752 := TCodeRedirect.Create(@TDSServerCommand_Patch.CreateParameterRow, @TDSServerCommand_Patch.CreateParameterRowPatch);
finalization
  QC78752.Free;
end.

Troubleshoot: Both patches applied and work for the example but I still encounter “Invalid Pointer Operation”

This problem also filed in QC#78752.  The problem is due to the following 2 methods:

  1. procedure TDBXStreamValue.SetValue
  2. function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;

TDBXLookAheadStreamReader.ConvertToMemoryStream return a managed FStream object to TDBXStreamValue.SetValue.  This stream object become another managed object of TDBXStreamValue.  It turns out that a Stream object managed by two objects and the exception raised when these 2 objects attempt to free the Stream object:

procedure TDBXStreamValue.SetValue(const Value: TDBXValue);
begin
  if Value.IsNull then
    SetNull
  else
  begin
    SetStream(Value.GetStream(False), True);
  end;
end;
function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
...
begin
  if FStream = nil then
    Result := nil
  else
  begin
    Count := Size;
    if not (FStream is TMemoryStream) then
    begin
      ...
      StreamTemp := FStream;
      FStream := Stream;
      FreeAndNil(StreamTemp);
    end;
    FStream.Seek(0, soFromBeginning);
    FHasLookAheadByte := false;
    Result := FStream;
  end;
end;

The fix is as follow:

unit DBXCommon.QC78752;

interface

implementation

uses SysUtils, Classes, DBXCommon, CodeRedirect;

type
  TDBXLookAheadStreamReaderAccess = class(TDBXStreamReader)
  private
    FStream: TStream;
    FEOS:               Boolean;
    FHasLookAheadByte:  Boolean;
    FLookAheadByte:     Byte;
  end;

  TDBXLookAheadStreamReaderHelper = class helper for TDBXLookAheadStreamReader
  private
    function Accessor: TDBXLookAheadStreamReaderAccess;
  public
    function ConvertToMemoryStreamPatch: TStream;
  end;

function TDBXLookAheadStreamReaderHelper.Accessor:
    TDBXLookAheadStreamReaderAccess;
begin
  Result := TDBXLookAheadStreamReaderAccess(Self);
end;

function TDBXLookAheadStreamReaderHelper.ConvertToMemoryStreamPatch: TStream;
var
  Stream: TMemoryStream;
  StreamTemp: TStream;
  Count: Integer;
  Buffer: TBytes;
  ReadBytes: Integer;
begin
  if Accessor.FStream = nil then
    Result := nil
  else
  begin
    Count := Size;
    if not (Accessor.FStream is TMemoryStream) then
    begin
      Stream := TMemoryStream.Create;
      if Count >= 0 then
        Stream.SetSize(Count);
      if Accessor.FHasLookAheadByte then
        Stream.Write(Accessor.FLookAheadByte, 1);
      SetLength(Buffer, 256);
      while true do
      begin
        ReadBytes := Accessor.FStream.Read(Buffer, Length(Buffer));
        if ReadBytes > 0 then
          Stream.Write(Buffer, ReadBytes)
        else
          Break;
      end;
      StreamTemp := Accessor.FStream;
      Accessor.FStream := Stream;
      FreeAndNil(StreamTemp);
      Result := Accessor.FStream;
    end else begin
      Stream := TMemoryStream.Create;
      Accessor.FStream.Seek(0, soFromBeginning);
      Stream.CopyFrom(Accessor.FStream, Accessor.FStream.Size);
    end;
    Stream.Seek(0, soFromBeginning);
    Accessor.FHasLookAheadByte := false;

    Result := Stream;
//    Stream := TMemoryStream.Create;
//    Stream.LoadFromStream(FStream);
//    FStream.Seek(0, soFromBeginning);
//    Result := Stream;
  end;
end;

var QC78752: TCodeRedirect;

initialization
  QC78752 := TCodeRedirect.Create(@TDBXLookAheadStreamReader.ConvertToMemoryStream, @TDBXLookAheadStreamReader.ConvertToMemoryStreamPatch);
finalization
  QC78752.Free;
end.

Troubleshoot: I encounter memory leaks after close the application

There is a memory leaks in TDSServerConnection for in-process connection.  I have filed a report in QC#78696.

Here is the fix:

unit DSServer.QC78696;

interface

implementation

uses SysUtils,
     DBXCommon, DSServer, DSCommonServer, DBXMessageHandlerCommon, DBXSqlScanner,
     DBXTransport,
     CodeRedirect;

type
  TDSServerConnectionHandlerAccess = class(TDBXConnectionHandler)
    FConProperties: TDBXProperties;
    FConHandle: Integer;
    FServer: TDSCustomServer;
    FDatabaseConnectionHandler: TObject;
    FHasServerConnection: Boolean;
    FInstanceProvider: TDSHashtableInstanceProvider;
    FCommandHandlers: TDBXCommandHandlerArray;
    FLastCommandHandler: Integer;
    FNextHandler: TDBXConnectionHandler;
    FErrorMessage: TDBXErrorMessage;
    FScanner: TDBXSqlScanner;
    FDbxConnection: TDBXConnection;
    FTransport: TDSServerTransport;
    FChannel: TDbxChannel;
    FCreateInstanceEventObject: TDSCreateInstanceEventObject;
    FDestroyInstanceEventObject: TDSDestroyInstanceEventObject;
    FPrepareEventObject: TDSPrepareEventObject;
    FConnectEventObject: TDSConnectEventObject;
    FErrorEventObject: TDSErrorEventObject;
    FServerCon: TDSServerConnection;
  end;

  TDSServerConnectionPatch = class(TDSServerConnection)
  public
    destructor Destroy; override;
  end;

  TDSServerDriverPatch = class(TDSServerDriver)
  protected
    function CreateConnectionPatch(ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
  end;

destructor TDSServerConnectionPatch.Destroy;
begin
  inherited Destroy;
  TDSServerConnectionHandlerAccess(ServerConnectionHandler).FServerCon := nil;
  ServerConnectionHandler.Free;
end;

function TDSServerDriverPatch.CreateConnectionPatch(
  ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
begin
  Result := TDSServerConnectionPatch.Create(ConnectionBuilder);
end;

var QC78696: TCodeRedirect;

initialization
  QC78696 := TCodeRedirect.Create(@TDSServerDriverPatch.CreateConnection, @TDSServerDriverPatch.CreateConnectionPatch);
finalization
  QC78696.Free;
end.

1 comment:

DelphiLib said...

Congratulations - Excelent solucions and explanation about those bugs.