Classic DataSnap
Prior to Delphi 2009, we may use either TLocalConnection or TSocketConnection together with TConnectionBroker for in-process or out-of-process communication via IAppServer interface. There are even more DataSnap connection that supports IAppServer. Check Delphi helps for details.
New DataSnap from Delphi 2009
Previously, TSQLConnection was used in DataSnap server only. In new DataSnap, we may use TSQLConnection in DataSnap client. There is a new driver call DataSnap that allow us to connect to a DataSnap server either via TCP or HTTP protocol using REST data packet for multi-tier application. Furthermore, we may use connect to TDSSever (TDSServer.Name) via TSQLConnection.DriverName for in-process connection. This benefits us to write a scalable multi-tier DataSnap application to consume server methods. See here for more details.
In Delphi 2009/2010, a new DataSnap connection component – TDSProviderConnection was introduced. As the name implied, it supply providers from DataSnap server. This connection require a TSQLConnection instance to work with in client tier. Thus, we may use a single TSQLConnection in client tier either in-process or out-of-process. And that fulfill the design philosophy of scalable multi-tier DataSnap application.
There are lots of demo or CodeRage videos available in the web showing how to TDSProviderConnection in DataSnap client tier. However, most of the examples only showing out-of-process design. I never find one example illustrate the usage of TDSProviderConnection for in-process design while writing this topic. Hope there are more from other famous or well know Delphi fans.
At first, I thought it is easy to use TDSProviderConnection for in-process design. But I face problems while follow the rules. These problems should be related to bugs and in mature design of DataSnap framework. I will show at here how to deals with the problems
Design a DataSnap module
First, we design a simple DataSnap module for this example. This is a TDSServerModule descendant instance with 2 components: a TDataSetProvider and a TClientDataSet instance. The reason using TDSServerModule is it will manage providers define in the module.
MySeverProvider.DFM
object ServerProvider: TServerProvider
OldCreateOrder = False
OnCreate = DSServerModuleCreate
Height = 225
Width = 474
object DataSetProvider1: TDataSetProvider
DataSet = ClientDataSet1
Left = 88
Top = 56
end
object ClientDataSet1: TClientDataSet
Aggregates = <>
Params = <>
Left = 200
Top = 56
end
end
MyServerProvider.PAS
type
TServerProvider = class(TDSServerModule)
DataSetProvider1: TDataSetProvider;
ClientDataSet1: TClientDataSet;
procedure DSServerModuleCreate(Sender: TObject);
end;
{$R *.dfm}
procedure TServerProvider.DSServerModuleCreate(Sender: TObject);
begin
ClientDataSet1.LoadFromFile('..\orders.cds');
end;
Define a transport layer for the provider module
Since this is a in-process application, we don’t really need a physical transport layer for the provider module. What we need here is a TDSServer and a TDSServerClass instance that helps propagate the providers to ClientDataSet in later stage.
var C: TDSServer:
D: TDSServerClass;
begin
C := TDSServer.Create(nil);
D := TDSServerClass.Create(nil);
try
C.Server := D;
C.OnGetClass := OnGetClass;
D.Start;
finally
D.Free;
C.Free;
end;
end;
procedure TForm1.OnGetClass(DSServerClass: TDSServerClass; var
PersistentClass: TPersistentClass);
begin
PersistentClass := TServerProvider;
end;
Use TDSProviderConnection to consume the in-process DataSnap service
We start hook up everything in DataSnap context to get it done:
var Q: TSQLConnection;
D: TDSServer;
C: TDSServerClass;
P: TServerProvider;
N: TDSProviderConnection;
begin
P := TServerProvider.Create(nil);
D := TDSServer.Create(nil);
C := TDSServerClass.Create(nil);
Q := TSQLConnection.Create(nil);
N := TDSProviderConnection.Create(nil);
try
C.Server := D;
C.OnGetClass := OnGetClass;
D.Start;
Q.DriverName := 'DSServer';
Q.LoginPrompt := False;
Q.Open;
N.SQLConnection := Q;
N.ServerClassName := 'TServerProvider';
N.Connected := True;
ClientDataSet1.RemoteServer := N;
ClientDataSet1.ProviderName := 'DataSetProvider1';
ClientDataSet1.Open;
ShowMessage(IntToStr(ClientDataSet1.RecordCount));
finally
N.Free;
Q.Free;
C.Free;
D.Free;
P.Free;
end;
end;
If you are using Delphi version 14.0.3513.24210 or prior than that, you will find it doesn’t work, a “Invalid pointer operation” exception raise after that.
I have found all the problems faced so far and the fixed are as follow.
Troubleshoot: Invalid pointer operation
There is a bug in DSUtil.StreamToDataPacket. I have file a report in QC#78666.
Here is a fix without changing the DBX source code:
unit DSUtil.QC78666;
interface
implementation
uses SysUtils, Variants, VarUtils, ActiveX, Classes, DBXCommonResStrs, DSUtil,
CodeRedirect;
type
THeader = class
const
Empty = 1;
Variant = 2;
DataPacket = 3;
end;
PIntArray = ^TIntArray;
TIntArray = array[0..0] of Integer;
TVarFlag = (vfByRef, vfVariant);
TVarFlags = set of TVarFlag;
EInterpreterError = class(Exception);
TVariantStreamer = class
private
class function ReadArray(VType: Integer; const Data: TStream): OleVariant;
public
class function ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
end;
const
EasyArrayTypes = [varSmallInt, varInteger, varSingle, varDouble, varCurrency,
varDate, varBoolean, varShortInt, varByte, varWord, varLongWord];
VariantSize: array[0..varLongWord] of Word = (0, 0, SizeOf(SmallInt), SizeOf(Integer),
SizeOf(Single), SizeOf(Double), SizeOf(Currency), SizeOf(TDateTime), 0, 0,
SizeOf(Integer), SizeOf(WordBool), 0, 0, 0, 0, SizeOf(ShortInt), SizeOf(Byte),
SizeOf(Word), SizeOf(LongWord));
class function TVariantStreamer.ReadArray(VType: Integer; const Data: TStream): OleVariant;
var
Flags: TVarFlags;
LoDim, HiDim, Indices, Bounds: PIntArray;
DimCount, VSize, i: Integer;
V: OleVariant;
LSafeArray: PSafeArray;
P: Pointer;
begin
VarClear(Result);
Data.Read(DimCount, SizeOf(DimCount));
VSize := DimCount * SizeOf(Integer);
GetMem(LoDim, VSize);
try
GetMem(HiDim, VSize);
try
Data.Read(LoDim^, VSize);
Data.Read(HiDim^, VSize);
GetMem(Bounds, VSize * 2);
try
for i := 0 to DimCount - 1 do
begin
Bounds[i * 2] := LoDim[i];
Bounds[i * 2 + 1] := HiDim[i];
end;
Result := VarArrayCreate(Slice(Bounds^,DimCount * 2), VType and varTypeMask);
finally
FreeMem(Bounds);
end;
if VType and varTypeMask in EasyArrayTypes then
begin
Data.Read(VSize, SizeOf(VSize));
P := VarArrayLock(Result);
try
Data.Read(P^, VSize);
finally
VarArrayUnlock(Result);
end;
end else
begin
LSafeArray := PSafeArray(TVarData(Result).VArray);
GetMem(Indices, VSize);
try
FillChar(Indices^, VSize, 0);
for I := 0 to DimCount - 1 do
Indices[I] := LoDim[I];
while True do
begin
V := ReadVariant(Flags, Data);
if VType and varTypeMask = varVariant then
SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, V))
else
SafeArrayCheck(SafeArrayPutElement(LSafeArray, Indices^, TVarData(V).VPointer^));
Inc(Indices[DimCount - 1]);
if Indices[DimCount - 1] > HiDim[DimCount - 1] then
for i := DimCount - 1 downto 0 do
if Indices[i] > HiDim[i] then
begin
if i = 0 then Exit;
Inc(Indices[i - 1]);
Indices[i] := LoDim[i];
end;
end;
finally
FreeMem(Indices);
end;
end;
finally
FreeMem(HiDim);
end;
finally
FreeMem(LoDim);
end;
end;
class function TVariantStreamer.ReadVariant(out Flags: TVarFlags; const Data: TStream): OleVariant;
var
I, VType: Integer;
W: WideString;
TmpFlags: TVarFlags;
begin
VarClear(Result);
Flags := [];
Data.Read(VType, SizeOf(VType));
if VType and varByRef = varByRef then
Include(Flags, vfByRef);
if VType = varByRef then
begin
Include(Flags, vfVariant);
Result := ReadVariant(TmpFlags, Data);
Exit;
end;
if vfByRef in Flags then
VType := VType xor varByRef;
if (VType and varArray) = varArray then
Result := ReadArray(VType, Data) else
case VType and varTypeMask of
varEmpty: VarClear(Result);
varNull: Result := NULL;
varOleStr:
begin
Data.Read(I, SizeOf(Integer));
SetLength(W, I);
Data.Read(W[1], I * 2);
Result := W;
end;
varDispatch, varUnknown:
raise EInterpreterError.CreateResFmt(@SBadVariantType,[IntToHex(VType,4)]);
else
TVarData(Result).VType := VType;
Data.Read(TVarData(Result).VPointer, VariantSize[VType and varTypeMask]);
end;
end;
procedure StreamToDataPacket(const Stream: TStream; out VarBytes: OleVariant);
var
P: Pointer;
ByteCount: Integer;
Size: Int64;
begin
Stream.Read(Size, 8);
ByteCount := Integer(Size);
if ByteCount > 0 then
begin
VarBytes := VarArrayCreate([0, ByteCount-1], varByte);
P := VarArrayLock(VarBytes);
try
// Stream.Position := 0; // QC#78666 "Mismatched in datapacket" with DSUtil.StreamToDataPacket
Stream.Read(P^, ByteCount);
Stream.Position := 0;
finally
VarArrayUnlock(VarBytes);
end;
end
else
VarBytes := Null;
end;
procedure StreamToVariantPatch(const Stream: TStream; out VariantValue: OleVariant);
var
Flags: TVarFlags;
Header: Byte;
begin
if Assigned(Stream) then
begin
Stream.Position := 0;
Stream.Read(Header, 1);
if Header = THeader.Variant then
VariantValue := TVariantStreamer.ReadVariant(Flags, Stream)
else if Header = THeader.DataPacket then
StreamToDataPacket(Stream, VariantValue)
else
Assert(false);
end;
end;
var QC78666: TCodeRedirect;
initialization
QC78666 := TCodeRedirect.Create(@StreamToVariant, @StreamToVariantPatch);
finalization
QC78666.Free;
end.
Troubleshoot: I still encounter “Invalid Pointer Operation” after apply DSUtil.StreamToDataPacket patch
I have filed this problem in QC#78752. An in-process DataSnap create an instance of TDSServerCommand. A method of TDSServerCommand create TDBXNoOpRow instance:
function TDSServerCommand.CreateParameterRow: TDBXRow;
begin
Result := TDBXNoOpRow.Create(FDbxContext);
end;
Most of the methods in TDBXNoOpRow is not implemented. There are 2 methods in class TDBXNoOpRow, GetStream and SetStream are used in subsequence operations. This is the reason that cause the exception.
After fix TDBXNoOpRow problem, the data packet will transport to ClientDataSet successfully.
The fix is as follow:
unit DBXCommonServer.QC78752;
interface
uses SysUtils, Classes, DBXCommon, DSCommonServer, DBXCommonTable;
type
TDSServerCommand_Patch = class(TDSServerCommand)
protected
function CreateParameterRowPatch: TDBXRow;
end;
TDBXNoOpRowPatch = class(TDBXNoOpRow)
private
function GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes): Integer;
protected
procedure GetStream(DbxValue: TDBXStreamValue; var Stream: TStream; var IsNull:
LongBool); override;
procedure SetStream(DbxValue: TDBXStreamValue; StreamReader: TDBXStreamReader);
override;
function UseExtendedTypes: Boolean; override;
end;
TDBXStreamValueAccess = class(TDBXByteArrayValue)
private
FStreamStreamReader: TDBXLookAheadStreamReader;
end;
implementation
uses CodeRedirect;
function TDSServerCommand_Patch.CreateParameterRowPatch: TDBXRow;
begin
Result := TDBXNoOpRowPatch.Create(FDbxContext);
end;
procedure TDBXNoOpRowPatch.GetStream(DbxValue: TDBXStreamValue; var Stream: TStream;
var IsNull: LongBool);
var iSize: integer;
B: TBytes;
begin
iSize := GetBytesFromStreamReader(TDBXStreamValueAccess(DbxValue).FStreamStreamReader, B);
IsNull := iSize = 0;
if not IsNull then begin
Stream := TMemoryStream.Create;
Stream.Write(B[0], iSize);
end;
end;
procedure TDBXNoOpRowPatch.SetStream(DbxValue: TDBXStreamValue; StreamReader:
TDBXStreamReader);
var B: TBytes;
iSize: integer;
begin
iSize := GetBytesFromStreamReader(StreamReader, B);
Dbxvalue.SetDynamicBytes(0, B, 0, iSize);
end;
function TDBXNoOpRowPatch.GetBytesFromStreamReader(const R: TDBXStreamReader; out Buf: TBytes):
Integer;
const BufSize = 50 * 1024;
var iPos: integer;
iRead: integer;
begin
Result := 0;
while not R.Eos do begin
SetLength(Buf, Result + BufSize);
iPos := Result;
iRead := R.Read(Buf, iPos, BufSize);
Inc(Result, iRead);
end;
SetLength(Buf, Result);
end;
function TDBXNoOpRowPatch.UseExtendedTypes: Boolean;
begin
Result := True;
end;
var QC78752: TCodeRedirect;
initialization
QC78752 := TCodeRedirect.Create(@TDSServerCommand_Patch.CreateParameterRow, @TDSServerCommand_Patch.CreateParameterRowPatch);
finalization
QC78752.Free;
end.
Troubleshoot: Both patches applied and work for the example but I still encounter “Invalid Pointer Operation”
This problem also filed in QC#78752. The problem is due to the following 2 methods:
- procedure TDBXStreamValue.SetValue
- function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
TDBXLookAheadStreamReader.ConvertToMemoryStream return a managed FStream object to TDBXStreamValue.SetValue. This stream object become another managed object of TDBXStreamValue. It turns out that a Stream object managed by two objects and the exception raised when these 2 objects attempt to free the Stream object:
procedure TDBXStreamValue.SetValue(const Value: TDBXValue);
begin
if Value.IsNull then
SetNull
else
begin
SetStream(Value.GetStream(False), True);
end;
end;
function TDBXLookAheadStreamReader.ConvertToMemoryStream: TStream;
...
begin
if FStream = nil then
Result := nil
else
begin
Count := Size;
if not (FStream is TMemoryStream) then
begin
...
StreamTemp := FStream;
FStream := Stream;
FreeAndNil(StreamTemp);
end;
FStream.Seek(0, soFromBeginning);
FHasLookAheadByte := false;
Result := FStream;
end;
end;
The fix is as follow:
unit DBXCommon.QC78752;
interface
implementation
uses SysUtils, Classes, DBXCommon, CodeRedirect;
type
TDBXLookAheadStreamReaderAccess = class(TDBXStreamReader)
private
FStream: TStream;
FEOS: Boolean;
FHasLookAheadByte: Boolean;
FLookAheadByte: Byte;
end;
TDBXLookAheadStreamReaderHelper = class helper for TDBXLookAheadStreamReader
private
function Accessor: TDBXLookAheadStreamReaderAccess;
public
function ConvertToMemoryStreamPatch: TStream;
end;
function TDBXLookAheadStreamReaderHelper.Accessor:
TDBXLookAheadStreamReaderAccess;
begin
Result := TDBXLookAheadStreamReaderAccess(Self);
end;
function TDBXLookAheadStreamReaderHelper.ConvertToMemoryStreamPatch: TStream;
var
Stream: TMemoryStream;
StreamTemp: TStream;
Count: Integer;
Buffer: TBytes;
ReadBytes: Integer;
begin
if Accessor.FStream = nil then
Result := nil
else
begin
Count := Size;
if not (Accessor.FStream is TMemoryStream) then
begin
Stream := TMemoryStream.Create;
if Count >= 0 then
Stream.SetSize(Count);
if Accessor.FHasLookAheadByte then
Stream.Write(Accessor.FLookAheadByte, 1);
SetLength(Buffer, 256);
while true do
begin
ReadBytes := Accessor.FStream.Read(Buffer, Length(Buffer));
if ReadBytes > 0 then
Stream.Write(Buffer, ReadBytes)
else
Break;
end;
StreamTemp := Accessor.FStream;
Accessor.FStream := Stream;
FreeAndNil(StreamTemp);
Result := Accessor.FStream;
end else begin
Stream := TMemoryStream.Create;
Accessor.FStream.Seek(0, soFromBeginning);
Stream.CopyFrom(Accessor.FStream, Accessor.FStream.Size);
end;
Stream.Seek(0, soFromBeginning);
Accessor.FHasLookAheadByte := false;
Result := Stream;
// Stream := TMemoryStream.Create;
// Stream.LoadFromStream(FStream);
// FStream.Seek(0, soFromBeginning);
// Result := Stream;
end;
end;
var QC78752: TCodeRedirect;
initialization
QC78752 := TCodeRedirect.Create(@TDBXLookAheadStreamReader.ConvertToMemoryStream, @TDBXLookAheadStreamReader.ConvertToMemoryStreamPatch);
finalization
QC78752.Free;
end.
Troubleshoot: I encounter memory leaks after close the application
There is a memory leaks in TDSServerConnection for in-process connection. I have filed a report in QC#78696.
Here is the fix:
unit DSServer.QC78696;
interface
implementation
uses SysUtils,
DBXCommon, DSServer, DSCommonServer, DBXMessageHandlerCommon, DBXSqlScanner,
DBXTransport,
CodeRedirect;
type
TDSServerConnectionHandlerAccess = class(TDBXConnectionHandler)
FConProperties: TDBXProperties;
FConHandle: Integer;
FServer: TDSCustomServer;
FDatabaseConnectionHandler: TObject;
FHasServerConnection: Boolean;
FInstanceProvider: TDSHashtableInstanceProvider;
FCommandHandlers: TDBXCommandHandlerArray;
FLastCommandHandler: Integer;
FNextHandler: TDBXConnectionHandler;
FErrorMessage: TDBXErrorMessage;
FScanner: TDBXSqlScanner;
FDbxConnection: TDBXConnection;
FTransport: TDSServerTransport;
FChannel: TDbxChannel;
FCreateInstanceEventObject: TDSCreateInstanceEventObject;
FDestroyInstanceEventObject: TDSDestroyInstanceEventObject;
FPrepareEventObject: TDSPrepareEventObject;
FConnectEventObject: TDSConnectEventObject;
FErrorEventObject: TDSErrorEventObject;
FServerCon: TDSServerConnection;
end;
TDSServerConnectionPatch = class(TDSServerConnection)
public
destructor Destroy; override;
end;
TDSServerDriverPatch = class(TDSServerDriver)
protected
function CreateConnectionPatch(ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
end;
destructor TDSServerConnectionPatch.Destroy;
begin
inherited Destroy;
TDSServerConnectionHandlerAccess(ServerConnectionHandler).FServerCon := nil;
ServerConnectionHandler.Free;
end;
function TDSServerDriverPatch.CreateConnectionPatch(
ConnectionBuilder: TDBXConnectionBuilder): TDBXConnection;
begin
Result := TDSServerConnectionPatch.Create(ConnectionBuilder);
end;
var QC78696: TCodeRedirect;
initialization
QC78696 := TCodeRedirect.Create(@TDSServerDriverPatch.CreateConnection, @TDSServerDriverPatch.CreateConnectionPatch);
finalization
QC78696.Free;
end.
1 comment:
Congratulations - Excelent solucions and explanation about those bugs.
Post a Comment