MQTT3.1.1delphi实现(xe7)跨平台

2016-08-22 19:17:31来源:CSDN作者:oYaoWang123人点击

需要做一个APP控制设备的程序,思来想去放弃自己实现服务端,准备直接采用现成的MQTT服务端程序,自己只需要关心逻辑,传输的交个MQTT .网上能找到的delphi的版本是老外的基于一个三方网络库的,win32下面可以编译运行,需要修改部分AnsiString和WideString。测试的时候会掉线,此条可能是因为当时没细读协议,规定时间未发送心跳包,被服务端断开。改成Android死活没能编译通过,技术问题,遂放弃。准备自己读协议,重新用TIdTCPClient实现一遍,这样可以方便的跨平台,也无三方控件,同时深入理解一下mqtt.代码基本参考老外的版本。先弄个能用的测试代码出来。服务端使用mosquitto.客户端现已完成MQTT连接,MQTT心跳包。
unit uMain;    interface    uses    System.SysUtils, System.Types, System.UITypes, System.Classes, System.Variants,IdGlobal,    FMX.Types, FMX.Controls, FMX.Forms, FMX.Graphics, FMX.Dialogs, FMX.StdCtrls, IdBaseComponent, IdComponent, IdTCPConnection, IdTCPClient, FMX.Layouts, FMX.Memo, FMX.Controls.Presentation, FMX.Edit;  const  FClientID='tndsoft';  //客户端ID,需要唯一,最终由设备ID代替    type    //  Message type. 4 Bit unsigned.    TMQTTMessageType = (            Reserved0,    //0 Reserved            CONNECT, //   1   Client request to connect to Broker            CONNACK, //   2   Connect Acknowledgment            PUBLISH, //   3   Publish message            PUBACK, //    4   Publish Acknowledgment            PUBREC, //    5   Publish Received (assured delivery part 1)            PUBREL, //    6   Publish Release (assured delivery part 2)            PUBCOMP, //   7   Publish Complete (assured delivery part 3)            SUBSCRIBE, // 8   Client Subscribe request            SUBACK, //    9   Subscribe Acknowledgment            UNSUBSCRIBE, // 10    Client Unsubscribe request            UNSUBACK, // 11   Unsubscribe Acknowledgment            PINGREQ, //   12  PING Request            PINGRESP, //  13  PING Response            DISCONNECT, // 14 Client is Disconnecting            Reserved15 // 15          );    TRemainingLength = Array of Byte;   //剩余长度字段,可变,1到4字节    TUTF8Text = Array of Byte;  type    TMQTTMessage = Record      FixedHeader: Byte;      RL: TBytes;      Data: TBytes;    End;    {事件}  TConnAckEvent = procedure (Sender: TObject; ReturnCode: integer) of object;    type    TfMQTTForDelphi = class(TForm)      Memo1: TMemo;      IdTCPClient1: TIdTCPClient;      Button1: TButton;      Button2: TButton;      edtHost: TEdit;      edtPort: TEdit;      Button3: TButton;      Button4: TButton;      procedure IdTCPClient1Connected(Sender: TObject);      procedure IdTCPClient1Disconnected(Sender: TObject);      procedure Button1Click(Sender: TObject);      procedure Button2Click(Sender: TObject);      procedure Button3Click(Sender: TObject);      procedure Button4Click(Sender: TObject);    private      { Private declarations }    public      { Public declarations }    end;  type    TClientHandleThread=class(TThread)    private      { Private declarations }      servercmd:integer;      serverMsg:string;      FCurrentData: TMQTTMessage;      FConnAckEvent: TConnAckEvent;//连接返回事件      procedure HandleInput;      protected      procedure Execute; override;    public      property OnConnAck : TConnAckEvent read FConnAckEvent write FConnAckEvent;    end;      var    fMQTTForDelphi: TfMQTTForDelphi;    myClientHandleThread:TClientHandleThread;    function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte;    function VariableHeader_Connect(KeepAlive: Word): TBytes;    procedure CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer);    function StrToBytes(str: string; perpendLength: boolean): TUTF8Text;    procedure AppendArray(var Dest: TUTF8Text; Source: Array of Byte);    function RemainingLength(x: Integer): TRemainingLength;    function BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength;  VariableHead: TBytes; Payload: Array of Byte): TBytes;    procedure AppendBytes(var DestArray: TBytes;const NewBytes: TBytes);    function RemainingLengthToInt(RLBytes: TBytes): Integer;  implementation    {$R *.fmx}  {$R *.NmXhdpiPh.fmx ANDROID}  {$R *.XLgXhdpiTb.fmx ANDROID}  {$R *.SmXhdpiPh.fmx ANDROID}    procedure TClientHandleThread.Execute;  var    CurrentMessage:TMQTTMessage;    vBuffer:TIdBytes;    Buffer: TBytes;    RLInt: Integer;    I:Integer;  begin    while not Terminated do    begin      if not fMQTTForDelphi.IdTCPClient1.Connected then        Terminate      else      try        CurrentMessage.FixedHeader := 0;        CurrentMessage.RL := nil;        CurrentMessage.Data := nil;        CurrentMessage.FixedHeader:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte;  //读取FixedHdader        {读取剩余长度-编码过}        SetLength(CurrentMessage.RL,1);        SetLength(Buffer,1);        CurrentMessage.RL[0]:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte;  //读取剩余长度第一位        for i := 1 to 3 do    //读取剩余长度其他位数,剩余长度为可变长度1-4.        begin          if (( CurrentMessage.RL[i - 1] and 128) <> 0) then          begin            Buffer[0] := fMQTTForDelphi.IdTCPClient1.IOHandler.ReadByte;            AppendBytes(CurrentMessage.RL, Buffer);          end          else          Break;        end;        {解码剩余长度}        RLInt := RemainingLengthToInt(CurrentMessage.RL);        {将剩余长度的数据全部读出}        if (RLInt > 0)  then        begin          SetLength(CurrentMessage.Data, RLInt);          fMQTTForDelphi.IdTCPClient1.IOHandler.ReadBytes(vBuffer,RLInt,True);          CurrentMessage.Data:=TBytes(vBuffer);  //        FPSocket^.RecvBufferEx(Pointer(CurrentMessage.Data), RLInt, 1000);        end;        FCurrentData := CurrentMessage;        Synchronize(HandleInput);      except        end;        try        servercmd:=fMQTTForDelphi.IdTCPClient1.IOHandler.ReadWord;        Synchronize(HandleInput);      except        end;    end;  end;    procedure TClientHandleThread.HandleInput;  var    MessageType: Byte;    DataLen: integer;    QoS: integer;    sErrCode:string;    Topic: string;    Payload: string;    ResponseVH: TBytes;    ConnectReturn: Integer;  begin    if (FCurrentData.FixedHeader <> 0) then      begin        MessageType := FCurrentData.FixedHeader shr 4;          if (MessageType = Ord(CONNACK)) then          begin            // Check if we were given a Connect Return Code.            ConnectReturn := 0;            // Any return code except 0 is an Error            if ((Length(FCurrentData.Data) > 0) and (Length(FCurrentData.Data) < 4)) then              begin                ConnectReturn := FCurrentData.Data[1];                case ConnectReturn of                  0:sErrCode:='连接已接受';                  1:sErrCode:='连接已拒绝,不支持的协议版本';                  2:sErrCode:='连接已拒绝,不合格的客户端标识符';                  3:sErrCode:='连接已拒绝,服务端不可用';                  4:sErrCode:='连接已拒绝,无效的用户名或密码';                  5:sErrCode:='连接已拒绝,未授权';                end;                fMQTTForDelphi.Memo1.Lines.Add(sErrCode);              end;            //if Assigned(OnConnAck) then OnConnAck(Self, ConnectReturn);          end        else        if (MessageType = Ord(PUBLISH)) then          begin  //          // Read the Length Bytes  //          DataLen := BytesToStrLength(Copy(FCurrentData.Data, 0, 2));  //          // Get the Topic  //          SetString(Topic, PAnsiChar(@FCurrentData.Data[2]), DataLen);  //          // Get the Payload  //          SetString(Payload, PAnsiChar(@FCurrentData.Data[2 + DataLen]), (Length(FCurrentData.Data) - 2 - DataLen));  //          if Assigned(OnPublish) then OnPublish(Self, Topic, Payload);          end        else        if (MessageType = Ord(SUBACK)) then          begin  //          // Reading the Message ID  //          ResponseVH := Copy(FCurrentData.Data, 0, 2);  //          DataLen := BytesToStrLength(ResponseVH);  //          // Next Read the Granted QoS  //          QoS := 0;  //          if (Length(FCurrentData.Data) - 2) > 0 then  //            begin  //              ResponseVH := Copy(FCurrentData.Data, 2, 1);  //              QoS := ResponseVH[0];  //            end;  //          if Assigned(OnSubAck) then OnSubAck(Self, DataLen, QoS);          end        else        if (MessageType = Ord(UNSUBACK)) then          begin  //          // Read the Message ID for the event handler  //          ResponseVH := Copy(FCurrentData.Data, 0, 2);  //          DataLen := BytesToStrLength(ResponseVH);  //          if Assigned(OnUnSubAck) then OnUnSubAck(Self, DataLen);          end        else        if (MessageType = Ord(PINGRESP)) then        begin          fMQTTForDelphi.Memo1.Lines.Add('收到心跳应答包。');          //        if Assigned(OnPingResp) then OnPingResp(Self);        end;      end;  end;      procedure TfMQTTForDelphi.Button1Click(Sender: TObject);  begin    IdTCPClient1.Host:=edtHost.Text;    IdTCPClient1.Port:=StrToInt(edtPort.Text);    IdTCPClient1.Connect;  end;    procedure TfMQTTForDelphi.Button2Click(Sender: TObject);  begin    myClientHandleThread:=TClientHandleThread.Create();    myClientHandleThread.Resume;  end;    procedure TfMQTTForDelphi.Button3Click(Sender: TObject);  var    MqttData_FixedHead:Byte;       //固定头    MqttData_VariableHeader:TBytes;//可变头    MqttData_RemainingLength: TRemainingLength;//剩余长度    Payload: TUTF8Text;    SendData: TBytes;            //构件完成的最终需要发送的二进制数据  begin    MqttData_FixedHead:=FixedHeader(CONNECT,0,0,0);    MqttData_VariableHeader:= VariableHeader_Connect(40);//构建可变头。参数单位秒,在此时间之内,客户端需要发送 PINGREQ 否则服务端将断开网络连接    {开始构建有效荷载,由于需要认证帐号密码,此处荷载内容为ID,USER,PWD}    SetLength(Payload, 0);    AppendArray(Payload, StrToBytes(FClientID, true)); //id    AppendArray(Payload, StrToBytes('ade', true));     //user    AppendArray(Payload, StrToBytes('ade', true));     //pwd    {计算剩余长度}    MqttData_RemainingLength:=RemainingLength(Length(MqttData_VariableHeader) + Length(Payload));    {组包}    SendData:=BuildCommand(MqttData_FixedHead, MqttData_RemainingLength, MqttData_VariableHeader, Payload);    {发送}    IdTCPClient1.Socket.Write(TIdBytes(SendData));  end;    procedure TfMQTTForDelphi.Button4Click(Sender: TObject);  var    FH: Byte;    RL: Byte;    Data: TBytes;  begin    SetLength(Data, 2);    FH := FixedHeader(PINGREQ, 0, 0, 0);    RL := 0;    Data[0] := FH;    Data[1] := RL;    IdTCPClient1.Socket.Write(TIdBytes(Data));  end;      procedure TfMQTTForDelphi.IdTCPClient1Connected(Sender: TObject);  begin    Memo1.Lines.Add('连接成功');  end;    procedure TfMQTTForDelphi.IdTCPClient1Disconnected(Sender: TObject);  begin    Memo1.Lines.Add('连接断开');  end;      function FixedHeader(MessageType: TMQTTMessageType; Dup, Qos,Retain: Word): Byte; //固定头第一个字节  begin    { Fixed Header Spec:     bit    |7 6 5   4       | |3         | |2   1        |  |  0   |     byte 1 |Message Type| |DUP flag| |QoS level|    |RETAIN| }    Result := (Ord(MessageType) * 16) + (Dup * 8) + (Qos * 2) + (Retain * 1);  end;    function RemainingLength(x: Integer): TRemainingLength;     //固定头第二个字节,动态长度1-4  var    byteindex: integer;    digit: integer;  begin    SetLength(Result, 1);    byteindex := 0;    while (x > 0) do    begin      digit := x mod 128;      x := x div 128;      if x > 0 then      begin        digit := digit or 128;      end;      Result[byteindex] := digit;      if x > 0 then      begin        inc(byteindex);        SetLength(Result, Length(Result) + 1);      end;    end;  end;      function VariableHeader_Connect(KeepAlive: Word): TBytes;  const    MQTT_PROTOCOL = 'MQTT';    MQTT_VERSION = 4;//V3.1.1协议级别为4.非3了  var    Qos, Retain: word;    iByteIndex: integer;    ProtoBytes: TUTF8Text;    ConnectFlag:Byte;//连接标志  begin    SetLength(Result, 10);    //长度为10    iByteIndex := 0;    ProtoBytes := StrToBytes(MQTT_PROTOCOL, true);    CopyIntoArray(Result, ProtoBytes, iByteIndex);     //协议名        Inc(iByteIndex, Length(ProtoBytes));    Result[iByteIndex] := MQTT_VERSION;               //版本号      Inc(iByteIndex);    asm      mov ConnectFlag,11000010B //UserName,Pwd,CleanSession为1,其余均为0    end;    Result[iByteIndex] := ConnectFlag;//连接标志      Inc(iByteIndex);    Result[iByteIndex] := 0;          //保持连接时间第一位      Inc(iByteIndex);    Result[iByteIndex] := KeepAlive; //保持连接时间第二位  end;    procedure CopyIntoArray(var DestArray: Array of Byte; SourceArray: Array of Byte; StartIndex: integer);  begin    Assert(StartIndex >= 0);      Move(SourceArray[0], DestArray[StartIndex], Length(SourceArray));  end;      function StrToBytes(str: string; perpendLength: boolean): TUTF8Text;  var    i, offset: integer;  begin    { This is a UTF-8 hack to give 2 Bytes of Length followed by the string itself. }    if perpendLength then      begin        SetLength(Result, Length(str) + 2);        Result[0] := Length(str) div 256;        Result[1] := Length(str) mod 256;        offset := 1;      end    else      begin        SetLength(Result, Length(str));        offset := -1;      end;    for I := 1 to Length(str) do      Result[i + offset] := ord(str[i]);  end;    procedure AppendArray(var Dest: TUTF8Text; Source: Array of Byte);  var    DestLen: Integer;  begin    DestLen := Length(Dest);    SetLength(Dest, DestLen + Length(Source));    Move(Source, Dest[DestLen], Length(Source));  end;    function BuildCommand(FixedHeader: Byte; RemainL: TRemainingLength;  VariableHead: TBytes; Payload: Array of Byte): TBytes; //构建最终的发送数据包  var    iNextIndex: integer;  begin    // Attach Fixed Header (1 byte)    iNextIndex := 0;    SetLength(Result, 1);    Result[iNextIndex] := FixedHeader;      // Attach RemainingLength (1-4 bytes)    iNextIndex := Length(Result);    SetLength(Result, Length(Result) + Length(RemainL));    CopyIntoArray(Result, RemainL, iNextIndex);      // Attach Variable Head    iNextIndex := Length(Result);    SetLength(Result, Length(Result) + Length(VariableHead));    CopyIntoArray(Result, VariableHead, iNextIndex);      // Attach Payload.    iNextIndex := Length(Result);    SetLength(Result, Length(Result) + Length(Payload));    CopyIntoArray(Result, Payload, iNextIndex);  end;    procedure AppendBytes(var DestArray: TBytes;const NewBytes: TBytes);  var    DestLen: Integer;  begin    DestLen := Length(DestArray);    SetLength(DestArray, DestLen + Length(NewBytes));    Move(NewBytes, DestArray[DestLen], Length(NewBytes));  end;    function RemainingLengthToInt(RLBytes: TBytes): Integer;  var    multi: integer;    i: integer;    digit: Byte;  begin    multi := 1;    i := 0;    Result := 0;      digit := RLBytes[i];    repeat      digit := RLBytes[i];      Result := Result + (digit and 127) * multi;      multi := multi * 128;      Inc(i);    until ((digit and 128) = 0);  end;      end.      {MQTT包格式如下: [Fixed header]+[Variable header]+[Payload] [Fixed header]: FixedHeader+RemainingLength [Variable header] : 报文标识符 [Payload]:有效荷载   [CONNECT]:客户端发起连接 格式如下: [Fixed header]: FixedHeader+RemainingLength [Variable header] :协议名+协议级别+连接标志+保持连接                 04MQTT+$4+Flag+Time [Payload]: ClientId+User+Pwd  }  
后来测试以上代码在win32正常,安卓平台需要修改一下字符串的编码函数。需要注意。

最新文章

123

最新摄影

微信扫一扫

第七城市微信公众平台