Automatic reconnect with Habari message broker client libraries

The next release of Habari message broker client libraries includes an example for automatic reconnect in case of connection loss.

It uses two nested loops, where the outer loop connects and disconnects, and the inner loops processes (sends or receives) messages. If an exception occurs while sending or receiving data on the TCP socket, the program leaves the inner loop, closes the connection and restarts the outer loop.

Shown below is the code for sending with reconnect. Code for receiving data is included in the registered version. Note that it uses the open source logging framework Log4D.

unit SendWithReconnect;

interface

var
  RunSendWithReconnect: Boolean;

procedure Run;

implementation

uses
  BTCommAdapterIndy, BTJMSInterfaces, BTJMSConnectionFactory,
  Log4D,
  SysUtils, Classes;

type
  TSendWithReconnectThread = class(TThread)
  private
    Logger: TLogLogger;

    // Habari Client API objects
    Factory: IConnectionFactory;
    Connection: IConnection;
    Session: ISession;
    Queue: IDestination;
    Producer: IMessageProducer;

    procedure Connect;
    procedure Disconnect;
  public
    constructor Create;

    procedure Execute; override;
  end;

{ TSendWithReconnectThread }

constructor TSendWithReconnectThread.Create;
begin
  inherited Create(False);
  FreeOnTerminate := True;

  Logger := TLogLogger.GetLogger(TSendWithReconnectThread);

  Factory := TBTJMSConnectionFactory.Create(DEFAULT_USER,
    DEFAULT_PASSWORD, DEFAULT_BROKER_URL);
end;

procedure TSendWithReconnectThread.Connect;
begin
  while RunSendWithReconnect do
  begin
    try
      Logger.Info('Trying to connect');

      Connection := Factory.CreateConnection;
      Connection.Start;

      Session := Connection.CreateSession(False, amAutoAcknowledge);
      Queue := Session.CreateQueue('SendWithReconnect');
      Producer := Session.CreateProducer(Queue);

      Logger.Info('Connected!');
      // connected -> exit
      Break;

    except
      on E: Exception do
      begin
        Logger.Warn('Exception in Connect: ' + E.Message);

        if Assigned(Connection) then
        begin
          Connection.Close;
        end;

        Sleep(1000);
      end;
    end;
  end;
end;

procedure TSendWithReconnectThread.Disconnect;
begin
  Logger.Info('Disconnect');
  try
    Connection.Close;
  except
    on E: Exception do
    begin
      Logger.Warn('Exception in Disconnect: ' + E.Message);
      Sleep(1000);
    end;
  end;
end;

procedure TSendWithReconnectThread.Execute;
var
  Msg: IMessage;
begin
  // outer loop connects and reconnects on errors
  while RunSendWithReconnect do
  begin
    Connect;

    // inner loop sends messages
    while RunSendWithReconnect do
    begin
      try
         Logger.Info('Send a message');
         Sleep(1000);

         Msg := Session.CreateTextMessage('testmessage');

         Producer.Send(Msg);

      except
        on E: Exception do
        begin
          Logger.Warn('Exception: ' + E.Message);

          // on exception leave the inner loop
          Break;
        end;
      end;
    end;

    Disconnect;
  end;
end;

procedure Run;
begin
  RunSendWithReconnect := True;

  // create and start the thread
  TSendWithReconnectThread.Create;
end;

end.
Advertisements