Ulf Wendel

PoC: Using a Group Communication System (Isis2) to improve MySQL Replication HA

Modern NoSQL solutions make good, old MySQL Replication appear weak on High Availability (HA). Basically, MySQL users have three choices for MySQL Replication HA: give up on HA, believe that doubling single points of failures means HA, or go for a proper but complex solution. Albeit, as NoSQL world and competition proves, solid HA can be dead simple: embed a Group Communication System (GCS) into MySQL! No single point of failure and near zero client deployment is doable. In parts, the proposal surpassed Pacemaker/Corosync. Read on: story, slides, experimental code.

Free tip of the day: A Single Point of Failure cannot cure a SPOF

If on a sailing boat on the wide, wide ocean, and the captain is the only one who knows how to sail, would you feel safe? No? Me neither. The captain is a Single Point of Failure (SPOF). So is the master/primary in a MySQL Replication cluster.

Out-of-the-box MySQL Replication setup
Writes Reads
Master (SPOF) Slave (one of many)
  Slave
  Slave

If the boat was monitored by a helicopter, that will eventually fail or loose sight to the ship, would you feel significantly safer? No? Me neither. The helicopter is a single point of failure. So is MHA or mysqlfailover (now GA!) when used for monitoring of the master. Using a single monitor to trigger MySQL replication master failover means doubling the number of SPOFs!

MySQL Replication setup aiming at 99.9% HA
  Writes Reads
Monitor, e.g. mysqlfailover/MHA (SPOF) Master (SPOF) Slave (one of many)
    Slave
    Slave

Of course, I’m overding it. If the probability of one system to fail is 1:1000, then the probability of two systems failing at the same time is 1:10000 1000 * 1000 = 1000000, 1:1000000 (Thanks Ben for the hint!, Readers: see Baron’s comment below). That’s an improvement. Plus, the big achievement of both mysqlfailover and MHA is the automation of the actual master failover procedure. For this reason alone you shoud use either one.

A proper but complex no SPOF setup

A proper cluster solution such as Windows Failover Clustering or Pacemaker/Corosync/DRBD is way more secure (BTW, the whitepapers are worth checking). Pacemaker/Corosync eliminates all single points of failures. Additionally, it prevents transaction loss and speeds up the failover using Distributed Replicated Block Device to update a standby MySQL replication master server.

MySQL Replication 3rd party setup aiming at 99.99% HA
    Writes   Reads
Pacemaker (CRM) Corosync (CCM) Master (Standby) DRBD  
Pacemaker (CRM) Corosync (CCM) Master (Active) DRBD Slave (one of many)
Pacemaker (CRM) Corosync (CCM)     Slave
Pacemaker (CRM) Corosync (CCM)     Slave (one of many)

This is a true Unix style design. There are many, independent, small programs working together. At the core you find a Cluster Resource Manager (CRM) responsible for taking all kinds of actions, for example, database failover. And, at the core you find a seperate Cluster Communication Manager (CCM) that adds communication channels between all the nodes. A CCM is usually using reundant connections between the nodes. Given three nodes A, B, C connected to each other with A and B loosing their connection A->B, the CCM still allows A and B to communicate through C: A->C->B. The Pacemaker CRM and the Corosync CCM run on all the nodes. No single point of failure.

That is exactly what you want, but its way too complicated.

A typical failover procedure ignores the client badly!

MySQL HA solutions are all too often server-centric. Much emphasis is on the actual master failover procedure. There is little, if any, automation of client deployment. To avoid client deployment, the master is often assigned a virtual IP that is transferred as part of a failover.

Failed master New master
Virtual IP ->

But, master failover is not the only event clients need to be made aware of. For example, slave additions or failures. Telling a DBA to deploy clients manually is 1990s. This is not what keeps your product relevant in 2013. Sometimes a blown-up single monitor is proposed that clients can query to learn about changes to the cluster topology. Seriously: another SPOF, another communication channel but SQL?

Clients should continiously adapt themselves to cluster changes

The PHP mysqlnd drivers replication and load balacing plugin (PECL/mysqlnd_ms) proves the potential of a driver integrated proxy:

  • making complex clusters transparent to use: read-write splitting, load balancing, service levels (e.g. abstract on consistency). semi-automatic server selection for sharding and partitioning, …
  • scale by client – no bottlenecks due to centralized proxies
  • fail by client – failure does not affect many unlike with a central proxy such as MySQL Proxy

If the HA solution would only tell the driver about the clusters state, the driver could hide even more of a clusters complexity and improve ease of use. Imagine drivers got real-time server load information and would adapt their load balancing dynamically. Drivers could stop bombarding a badly lagging slave with requests to give it time to breath or lower their write rates.

The solution: Isis2, CRM/CCM moves into a MySQL plugin

Here comes the code for system that could offer proper HA without much added complexity! The idea is simple: use a Group Communication System (GCS) in a MySQL Server Plugin to connect all servers of a cluster with each other. Use the communication primitives of the GCS to exchange state information synchronously. State information is made of role (master, slave), status (on, off, standby), possibly current system load, or whatever else may be of interest.

At any time a GCS can give a list of its members. Thus, at any time, a client can ask for a list of all servers in the cluster. The GCS jointly decides on membership changes. It recognizes the addition of servers but also their failure. If a server fails, appropriate action can be taken. For example, a failover script can be run to elect a new master and reconfigure the MySQL Replication cluster.

Best HA today   Proposed PoC
No SPOF but complex   No SPOF, easy, best possible client support
Pacemaker (CRM) -v  
Corosync (CCM) -v  
MySQL | MySQL
  > MySQL Isis2 daemon plugin (CRM/CCM)
  > MySQL I_S cluster state information plugin
DRBD   (GTID logic)

Clients learn about changes in the cluster topology by querying INFORMATION_SCHEMA tables. If a client fails to connect to one server, it can pick any of the remaining and ask it for an update on the list of servers. Then, if need be, the client can reconfigure itself. No DBA action needed, zero administration within reach. As state information is exposed through I_S tables, clients do not have to learn from special, possibly centralized servers likely requiring extra communication protocols. There is no risk of management/monitoring servers getting slashdotted. And, updating client programs is easy as can be due to the use of plain SQL.

Just a few server plugins…

By embedding a GCS into a MySQL plugin the HA stack is greatly simplified. The job of Pacemaker and Corosync is done by the MySQL plugin. All you need for a MySQL HA setup could be delivered in one download package. All you need to add HA would be load some MySQL plugins… well, in theory. You get the point.

On the hunt for a C/C++ GCS

The biggest challenge in implementing such a system is the hunt for a free and Open Source Group Communication System that can be embedded in a C/C++ MySQL server plugin. Corosync has a client/deamon-server design that is not very tempting. Its brother, the Spread Toolkit, is somewhat limited to ~40 nodes. Neither solution has an appealing API. The rest is either old, is implemented in the wrong language (Java) or is out of question for me (Galera). I had almost given up searching… if only MySQL had a GCS.

Then came Isis2. Wrong language (C#) but what an API! The Isis2 Cloud Computing Library is the successor of a true classic, the Isis library. Experts will immediately associate the Virtual Synchrony Model (see also slide 34 and following) with Isis and recall the success stories. Isis2 combines Virtual Synchrony and Paxos ideas (see video). Paxos are commonly used in NoSQL solutions.

Most important: how easy Isis2 makes distributed cloud computing. A simple job like exchanging state information between MySQL servers becomes absolutely trivial!

One downside: C# to C/C++ language barrier

For my purposes, Isis2 has one big downside. It is written in C#/.NET. A MySQL C/C++ server plugin cannot use it directly. MySQL cannot become a direct member of an Isis2 group. Instead, one has to write an Isis2 client in C# which communicates with MySQL through a network socket. As we now have two independent processes, there is an additional heartbeat between MySQL and the Isis2 client in the PoC. This heartbeat would not be required if everything was in one process, which would be the case if one would use a pure C/C++ GCS.

Programming language Component
C/C++ MySQL
C/C++ MySQL daemon Isis2 heartbeat plugin
  |
C#/.NET (Mono) Isis2 client socket server

I’m happy to pay this price for the wonderful API that Isis2 offers. Expressed in extra lines of code the price for the PoC is probably less than 20% respectively 200 lines. Let the hacking begin…

The Isis2 client socket server

Disclaimer: all the code I show is entirely untested and partically known to be buggy or unstable. I stopped my fun tour through C#/.NET and MySQL plugin development after three days. This was the time it took me to realize “it could be done”, which is all I wanted for a PoC. That said, here is the code for the Isis2 client.


using System;
using System.Text;
using System.Net;
using System.Net.Sockets;
using System.Threading;

using Isis;
using System.Collections.Generic;
using System.Linq;

namespace IsisMySQLConnector
{

[AutoMarshalled]
class MySQLServer {
    public string isis_daemon;
    public string host;
    public string port;
    public string status;
    public string role;
    public int ttl;
    public string last_update;

    public MySQLServer() { }
    public MySQLServer(string daemon, string h, string p, string st, string ro, string up) {
        isis_daemon = daemon;
        host = h;
        port = p;
        status = st;
        role = ro;
        ttl = 30;
        last_update = up;
    }
}

delegate void RemoteRegisterMySQLServer(string daemon, string host, string port, string status, string role, string last_update);
delegate void RemoteUnregisterMySQLServer(string daemon, string host, string port);
delegate void RemoteExpireMySQLServer(string daemon, string host, string port);
delegate void RemoteHeartbeat(string daemon, string host, string port, string status, int ttl, string last_update);
delegate void RemoteViewChange(View v);
delegate void LoadMySQLServersCheckpoint(List<MySQLServer> mysqlServers);

class GroupConnector
{
    protected Isis.Timeout myTO = new Isis.Timeout(10000, Isis.Timeout.TO_FAILURE);
    protected EOLMarker myEOL = new EOLMarker();
    protected int UPDATE_MYSQL_REGISTER = 1;
    protected int UPDATE_MYSQL_UNREGISTER = 2;
    protected int UPDATE_MYSQL_HEARTBEAT = 3;
    protected int UPDATE_MYSQL_EXPIRE = 4;
    protected string isis_group;
    protected bool connected = false;
    protected Group myGroup;

    private Object thisLock = new Object();
    protected List<MySQLServer> mysqlServers = new List<MySQLServer>();

    internal const byte MYSQL_SERVER_TID = 123;

    public GroupConnector(string group) {
        isis_group = group;
        Thread.CurrentThread.Name = "GroupConnector Main " + isis_group;
        IsisSystem.Start();
        Msg.RegisterType(typeof(MySQLServer), MYSQL_SERVER_TID);
    }

    ~GroupConnector() {
        IsisSystem.Shutdown();
    }

    public void Connect() {
        if (connected) {
            throw new Exception("Already connected.");
        }

        myGroup = new Group(isis_group);
        myGroup.Handlers[UPDATE_MYSQL_REGISTER] += (RemoteRegisterMySQLServer)RemoteRegisterMySQLServer;
        myGroup.Handlers[UPDATE_MYSQL_UNREGISTER] += (RemoteUnregisterMySQLServer)RemoteUnregisterMySQLServer;
        myGroup.Handlers[UPDATE_MYSQL_HEARTBEAT] += (RemoteHeartbeat)RemoteHeartbeat;
        myGroup.Handlers[UPDATE_MYSQL_EXPIRE] += (RemoteExpireMySQLServer)RemoteExpireMySQLServer;
        myGroup.ViewHandlers += (Isis.ViewHandler)RemoteViewChange;

        myGroup.MakeChkpt += (Isis.ChkptMaker)delegate(View nv) {
            lock(thisLock) {
                if (mysqlServers.Count > 0) {
                    Console.WriteLine("Transferring state to joining member");
                    myGroup.SendChkpt(mysqlServers);
                }
                myGroup.EndOfChkpt();
            }
        };

        myGroup.LoadChkpt += (LoadMySQLServersCheckpoint)delegate(List<MySQLServer> initialMySQLServers) {
            lock(thisLock) {
                foreach (MySQLServer server in initialMySQLServers) {
                    mysqlServers.Add(server);
                    Console.WriteLine("[GroupConnector][state transfer] received: {0}-{1}-{2}-{3}-{4}-{5}",
                                      server.isis_daemon,
                                      server.host,
                                      server.port,
                                      server.role,
                                      server.status,
                                      server.ttl);
                }
            }
        };
        myGroup.Join();
        connected = true;
    }

    protected void RemoteViewChange(View v) {
        if (v.leavers.Length > 0) {
            Console.WriteLine("[GroupConnector][view change][{0}] Deamon leaves", Thread.CurrentThread.Name);
            Thread t = new Thread(delegate() {
                lock(thisLock) {
                    int idx;
                    foreach (Isis.Address leaver in v.leavers) {
                        while ((idx = mysqlServers.FindIndex(
                                          delegate(MySQLServer server)
                    {
                        return server.isis_daemon == leaver.ToString();
                        })) != -1) {
                            MySQLServer server = mysqlServers[idx];
                            Console.WriteLine("[GroupConnector][view change][{0}] removing server {1}-{2}-{3}-{4}-{5}-{6}", Thread.CurrentThread.Name,
                                              server.isis_daemon,
                                              server.host,
                                              server.port,
                                              server.role,
                                              server.status,
                                              server.ttl
                                             );
                            mysqlServers.RemoveAt(idx);
                        }
                    }

                }
            });
            t.Name = "GroupConnector ViewChange " + isis_group;
            t.Start();
        } else {
            Console.WriteLine("[GroupConnector][view change][{0}] Some view change (e.g. join), no action required", Thread.CurrentThread.Name);
        }
    }

    protected void RemoteRegisterMySQLServer(string daemon, string host, string port, string status, string role, string last_update) {
        Thread t = new Thread(delegate() {
            lock(thisLock) {
                int idx = mysqlServers.FindIndex(
                              delegate(MySQLServer server)
                {
                    return ((server.isis_daemon == daemon) && (server.host == host) && (server.port == port));
                });
                if (idx == -1) {
                    mysqlServers.Add(new MySQLServer(daemon, host, port, status, role, last_update));
                    Console.WriteLine("[GroupConnector][remote mysql register][{0}] Added {1}-{2}-{3}-{4}-{5}-{6}-30 New server count {7}",
                                      Thread.CurrentThread.Name,
                                      daemon,
                                      host,
                                      port,
                                      status,
                                      role,
                                      last_update,
                                      mysqlServers.Count);
                }
            }
        });
        t.Name = "RemoteRegisterMySQLServer " + isis_group;
        myGroup.SetReplyThread(t);
        t.Priority = ThreadPriority.AboveNormal;
        t.Start();
    }

    protected void RemoteUnregisterMySQLServer(string daemon, string host, string port) {
        Thread t = new Thread(delegate() {
            lock(thisLock) {
                int idx = mysqlServers.FindIndex(
                              delegate(MySQLServer server)
                {
                    return ((server.isis_daemon == daemon) && (server.host == host) && (server.port == port));
                });
                if (idx != -1) {
                    MySQLServer server = mysqlServers[idx];
                    Console.WriteLine("[GroupConnector][unregister][{0}] removing server {1}-{2}-{3}-{4}-{5}-{6}-{7}", Thread.CurrentThread.Name,
                                      server.isis_daemon,
                                      server.host,
                                      server.port,
                                      server.role,
                                      server.status,
                                      server.last_update,
                                      server.ttl
                                     );
                    mysqlServers.RemoveAt(idx);
                }
            }
        });
        t.Name = "RemoteUnregisterMySQLServer" + isis_group;
        myGroup.SetReplyThread(t);
        t.Priority = ThreadPriority.AboveNormal;
        t.Start();
    }

    protected void RemoteHeartbeat(string daemon, string host, string port, string status, int ttl, string last_update) {
        if (daemon == IsisSystem.GetMyAddress().ToString()) {
            Console.WriteLine("[GroupConnector][heartbeat][{0}] Thread.CurrentThread.Name Ignoring Hearbeat message to ourselves to avoid deadlock.",
                              Thread.CurrentThread.Name
                             );
            return;
        }
        Thread t = new Thread(delegate() {
            lock(thisLock) {
                int idx = mysqlServers.FindIndex(
                              delegate(MySQLServer server)
                {
                    return ((server.isis_daemon == daemon) && (server.host == host) && (server.port == port));
                });
                if (idx != -1) {
                    mysqlServers[idx].ttl = ttl;
                    mysqlServers[idx].status = status;
                    mysqlServers[idx].last_update = last_update;
                    MySQLServer server = mysqlServers[idx];
                    Console.WriteLine("[GroupConnector][heartbeat][{0}] heartbeat server {1}-{2}-{3}-{4}-{5}-{6}-{7}", Thread.CurrentThread.Name,
                                      server.isis_daemon,
                                      server.host,
                                      server.port,
                                      server.role,
                                      server.status,
                                      server.ttl,
                                      server.last_update
                                     );
                }
            }
        });
        t.Name = "RemoteHeartbeat" + isis_group;
        myGroup.SetReplyThread(t);
        t.Priority = ThreadPriority.AboveNormal;
        t.Start();
    }

    protected void RemoteExpireMySQLServer(string daemon, string host, string port) {
        Thread t = new Thread(delegate() {
            lock(thisLock) {
                int idx = mysqlServers.FindIndex(
                              delegate(MySQLServer server)
                {
                    return ((server.isis_daemon == daemon) && (server.host == host) && (server.port == port));
                });
                if (idx != -1) {
                    MySQLServer server = mysqlServers[idx];
                    Console.WriteLine("[GroupConnector][remote expire][{0}] removing server {1}-{2}-{3}-{4}-{5}-{6}-{7}", Thread.CurrentThread.Name,
                                      server.isis_daemon,
                                      server.host,
                                      server.port,
                                      server.role,
                                      server.status,
                                      server.last_update,
                                      server.ttl
                                     );
                    mysqlServers.RemoveAt(idx);
                }
            }
        });
        t.Name = "RemoteExpireMySQLServer" + isis_group;
        myGroup.SetReplyThread(t);
        t.Priority = ThreadPriority.AboveNormal;
        t.Start();
    }

    public void RegisterMySQLServer(string host, string port, string status, string role) {
        myGroup.SafeSend(UPDATE_MYSQL_REGISTER, IsisSystem.GetMyAddress().ToString(), host, port, status, role, DateTime.Now.ToString());
    }

    public void UnregisterMySQLServer(string host, string port) {
        myGroup.SafeSend(UPDATE_MYSQL_UNREGISTER, IsisSystem.GetMyAddress().ToString(), host, port);
    }

    public string GetMySQLServers() {
        string ServerList = "";
        lock(thisLock) {
            foreach (MySQLServer server in mysqlServers) {
                ServerList += server.isis_daemon + " ";
                ServerList += server.host + " ";
                ServerList += server.port + " ";
                ServerList += server.status + " ";
                ServerList += server.role + " ";
                ServerList += server.ttl + " ";
                ServerList += server.last_update + Environment.NewLine;
            }
        }
        return ServerList;
    }

    public void HeartbeatMySQLServer(string host, string port, string status) {
        lock(thisLock) {
            int idx = mysqlServers.FindIndex(
                          delegate(MySQLServer server)
            {
                return ((server.isis_daemon == IsisSystem.GetMyAddress().ToString()) && (server.host == host) && (server.port == port));
            });
            if (idx != -1) {
                /* We have created this one. Thus, we know how to interpret the time. Its our clock. */
                string last_update = DateTime.Now.ToString();
                Console.WriteLine("LastUpdate before heartbeat {0}", mysqlServers[idx].last_update);
                myGroup.SafeSend(UPDATE_MYSQL_HEARTBEAT, IsisSystem.GetMyAddress().ToString(), host, port, status, 30, last_update);
                mysqlServers[idx].ttl = 30;
                mysqlServers[idx].status = status;
                mysqlServers[idx].last_update = last_update;
                Console.WriteLine("LastUpdate after heartbeat {0}", mysqlServers[idx].last_update);
            }
        }
    }

    public void ExpireMySQLServer() {
        /* Note: we expire servers one by one, may take multiple calls to expire all our dead servers */
        string remove_host = "";
        string remove_port = "";
        lock(thisLock) {
            int idx =mysqlServers.FindIndex(
                         delegate(MySQLServer server)
            {
                return server.isis_daemon == IsisSystem.GetMyAddress().ToString();
            });
            if (idx != -1) {
                /* This is one of our servers for which we have set the last_update time. Thus, only we can
                 * expire it. The last_update time must be interpreted in relation to our local clock only.
                 * Other servers may have a different local time - clocks may not be synchronized. */
                MySQLServer server = mysqlServers[idx];
                DateTime lastUpdate = DateTime.Parse(server.last_update);
                TimeSpan diff = DateTime.Now - lastUpdate;
                TimeSpan maxTTL = new TimeSpan(0, 0, server.ttl);
                if (diff > maxTTL) {
                    remove_host = server.host;
                    remove_port = server.port;
                }
            }
        }
        if (remove_host != "") {
            myGroup.SafeSend(UPDATE_MYSQL_EXPIRE, IsisSystem.GetMyAddress().ToString(), remove_host, remove_port);
        }
    }

}

class GroupDaemon
{
    protected string[] mysqlStatus = {"on", "off", "starting", "stopping", "standby"};
    protected string[] mysqlRole = {"master", "slave", "standalone"};
    protected GroupConnector isisConnector;

    public GroupDaemon(GroupConnector c) {
        isisConnector = c;
    }

    public void ReceiveCallback(IAsyncResult AsyncCall)
    {
        byte[] questionBuffer = new byte[512];
        System.Text.ASCIIEncoding encoding = new System.Text.ASCIIEncoding();
        Socket listener = (Socket)AsyncCall.AsyncState;
        Socket client = listener.EndAccept(AsyncCall);
        int byteCount = client.Receive(questionBuffer, SocketFlags.None);
        if (byteCount > 0) {
            string question = encoding.GetString(questionBuffer);
            string answer = ParseCommands(question);
            
            Console.WriteLine("Q: {0} '{1}'", byteCount, question);
            Console.WriteLine("A: {0} '{1}'", answer.Length, answer);
            
            client.Send(encoding.GetBytes(answer));
        }
        client.Close();

        listener.BeginAccept(new AsyncCallback(ReceiveCallback), listener);
    }

    public string ParseCommands(string command) {

        if (command.IndexOf(Environment.NewLine) > 0) {
            command = command.Substring(0, command.IndexOf(Environment.NewLine) - 1);
        }

        string[] parts = command.Split(new Char[] {' '}, StringSplitOptions.RemoveEmptyEntries);
        if (parts.Length < 1) {
            return "1 Invalid command";
        }

        for (int i = 0; i < parts.Length; i++) {
            parts[i] = parts[i].ToLower();
        }


        if (parts[0] == "join") {
            string syntax = "join <host> <port> <mysqlStatus> <mysqlRole>";

            if (parts.Length != 5) {
                return "2 Wrong number of parameters, syntax: " + syntax;
            }
            if (!IsListed(parts[3], mysqlStatus)) {
                return "3 Unknown mysqlStatus '" + parts[3] + "', syntax: " + syntax;
            }
            if (!IsListed(parts[4], mysqlRole)) {
                return "4 Unknown mysqlRole '" + parts[4] + "', syntax: " + syntax;
            }

            isisConnector.RegisterMySQLServer(parts[1], parts[2], parts[3], parts[4]);
            return "0 OK";
        } else if (parts[0] == "leave") {
            string syntax = "leave <host> <port>";

            if (parts.Length != 3) {
                return "2 Wrong number of parameters, syntax: " + syntax;
            }

            isisConnector.UnregisterMySQLServer(parts[1], parts[2]);
            return "0 OK";
        } else if (parts[0] == "heartbeat") {
            string syntax = "heartbeat <host> <port> <mysqlStatus>";

            if (parts.Length != 4) {
                return "2 Wrong number of parameters, syntax: " + syntax;
            }
            if (!IsListed(parts[3], mysqlStatus)) {
                return "3 Unknown mysqlStatus '" + parts[3] + "', syntax: " + syntax;
            }

            isisConnector.HeartbeatMySQLServer(parts[1], parts[2], parts[3]);
            return isisConnector.GetMySQLServers();
        } else if (parts[0] == "serverlist") {
            return "0 " + isisConnector.GetMySQLServers();
        }

        return "1 Invalid command";
    }

    protected bool IsListed(string what, string[] list) {
        foreach (string member in list) {
            if (member == what) {
                return true;
            }
        }
        return false;
    }

    public void StartListener(string host, int port)
    {
        IPAddress localAddress = IPAddress.Parse(host);
        Socket listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint ipEndpoint = new IPEndPoint(localAddress, port);
        listenSocket.Bind(ipEndpoint);
        listenSocket.Listen(100 /* queue length */);
        listenSocket.BeginAccept(new AsyncCallback(ReceiveCallback), listenSocket);
        Console.WriteLine("[GroupDaemon] Server is waiting on socket {0}", listenSocket.LocalEndPoint);
    }

    public void Sleep(int duration) {
        Thread.Sleep(duration);
    }
}

class IsisDaemonMain
{

    /* Endpoint to which MySQL shall talk  - IPv4 host to bind to and listen on */
    protected const string listener_ipv4 = "127.0.0.1";
    /* Endpoint to which MySQL shall talk - Port to bind to and listen on */
    protected const int listener_port = 2200;
    /* Default Isis group */
    protected const string isis_group = "mysql";

    public static void Main(string[] args)
    {
        string host = listener_ipv4;
        int port = listener_port;
        string group = isis_group;

        if (args.Length > 0) {
            if ((args[0] == "help") || (args[0] == "--help") || (args[0] == "-h")) {
                Console.WriteLine("Usage: <name.exe> host port group");
                return;
            }
            host = args[0];
        }
        if (args.Length > 1) {
            port = int.Parse(args[1]);
        }
        if (args.Length > 2) {
            group = args[2];
        }

        try
        {
            GroupConnector connector = new GroupConnector(group);
            Console.WriteLine("[IsisDaemonMain] Connecting to ISIS...");
            connector.Connect();

            Console.WriteLine("[IsisDaemonMain] Starting daemon for communication with MySQL.");
            GroupDaemon daemon = new GroupDaemon(connector);
            daemon.StartListener(host, port);
            Console.WriteLine("[IsisDaemonMain] Started. Listening to client requests.");
            while (true) {
                daemon.Sleep(2000);
                connector.ExpireMySQLServer();
            }
        }
        catch (Exception e)
        {
            Console.WriteLine("[IsisDaemonMain] Caught Exception: {0}", e.ToString());
        }
    }
}
}

At the core: a distributed Isis2 group

Let’s build the core of the proposed MySQL HA cluster solution. The core is made of a distributed Isis2 group. Assume we have three hosts A, B and C in our MySQL cluster. On each host we start an Isis2 client. All clients try to join a distributed group of a certain name. Once they have joined the group, they can communicate with each other. If that all sounds a bit to magic, get yourself the book on the subject of Isis2 (Ken Birman, Guide to Reliable Distributed Systems: Building High-Assurance Applications and Cloud-Hosted Services).

Members of a distributed Isis2 group
  Host A  
  mono ./isis2_daemon.exe  
Host B   Host C
mono ./isis2_daemon.exe   mono ./isis2_daemon.exe

Isis2 offers a rich variety of Send() commands to communicate within the group. Options range from low level unreliable, asynchronously delivered messages for gossip protocols to slow but reliable, totally ordered, virtually synchronous messages. The latter either reach all members of a group or none. As speed or group size is no major concern when exchanging nothing but a list of MySQL servers in a PoC, the PoC is using SafeSend() only. To an outside observer messages appear “synchronously” on all group members.

The Isis2 client connects to a virtual group

If you compile and run it (e.g. dmcs isis_deamon.cs Isis.cs ; mono ./isis_deamon.exe ), the Isis2 client tries to join a virtual, distributed group. For this it contacts the ORACLE and uses the Isis2 ORACLE Rendevous Service. No kidding, that’s how Isis2 calls it.


nixnutz@linux-dstv:~/src/isis_201245> rm isis_deamon.exe ; dmcs isis_deamon.cs Isis.cs ; mono ./isis_deamon.exe 
Isis: Searching for the Isis ORACLE...
[IsisDaemonMain] Connecting to ISIS...

If there are already other clients from other computers registered in the group a checkpoint is done to do a state transfer. The joining client learns the state from the other members. The state consists of a list of all MySQL servers that have registered themselves at their local Isis2 clients to become members.


[state transfer] received: (51971)-127.0.0.1-3400-master-on-30
[state transfer] received: (51971)-192.168.2.1-3306-slave-on-30

Isis2 abstracts away all the glory details from you: the networking, the group membership, the messaging associated with the state transfer and so fort. Those few lines make the core of the procedure.


 protected List<MySQLServer> mysqlServers = new List<MySQLServer>();
[...]
    public GroupConnector(string group) {
        isis_group = group;
        Thread.CurrentThread.Name = "GroupConnector Main " + isis_group;
        IsisSystem.Start();
        Msg.RegisterType(typeof(MySQLServer), MYSQL_SERVER_TID);
    }
[...]
    public void Connect() {
[...]     
        myGroup = new Group(isis_group);
        myGroup.Handlers[UPDATE_MYSQL_REGISTER] += (RemoteRegisterMySQLServer)RemoteRegisterMySQLServer;
        myGroup.Handlers[UPDATE_MYSQL_UNREGISTER] += (RemoteUnregisterMySQLServer)RemoteUnregisterMySQLServer;
        myGroup.Handlers[UPDATE_MYSQL_HEARTBEAT] += (RemoteHeartbeat)RemoteHeartbeat;
        myGroup.Handlers[UPDATE_MYSQL_EXPIRE] += (RemoteExpireMySQLServer)RemoteExpireMySQLServer;
        myGroup.ViewHandlers += (Isis.ViewHandler)RemoteViewChange;

        myGroup.MakeChkpt += (Isis.ChkptMaker)delegate(View nv) {
            lock(thisLock) {
                if (mysqlServers.Count > 0) {
                    Console.WriteLine("Transferring state to joining member");
                    myGroup.SendChkpt(mysqlServers);
                }
                myGroup.EndOfChkpt();
            }
        };

        myGroup.LoadChkpt += (LoadMySQLServersCheckpoint)delegate(List<MySQLServer> initialMySQLServers) {
            lock(thisLock) {
                foreach (MySQLServer server in initialMySQLServers) {
                    mysqlServers.Add(server);
                    Console.WriteLine("[GroupConnector][state transfer] received: {0}-{1}-{2}-{3}-{4}-{5}",
                                      server.isis_daemon,
                                      server.host,
                                      server.port,
                                      server.role,
                                      server.status,
                                      server.ttl);
                }
            }
        };
        myGroup.Join();
        connected = true;
    }
[...]

Did I already say, the Isis2 API is nice? Note this detail from the checkpointing/state transfer instructions. The Isis2 library sends a list of MySQLServer objects over the wire. As an Isis2 developer you do not give up on your standard OOP style…


protected List<MySQLServer> mysqlServers = new List<MySQLServer>();
[...]
myGroup.SendChkpt(mysqlServers);

The big downside of this beauty are the questionmarks behind a once discussed pure C++ port of Isis2. Developing a library that offers the same for C++ objects may be hard if not impossible.

Making an Isis2 client talk to MySQL

As soon as an Isis2 client has joined a distributed group, it starts a socket server to receive commands from the MySQL server associated with it. MySQL can then register itself in the group, announce its state and leave the group when shutdown.

Members of a distributed Isis2 group
  Host A  
  MySQL  
  MySQL Isis2d daemon plugin  
  mono ./isis2_daemon.exe  
Host B   Host C
MySQL   MySQL
MySQL Isis2d daemon plugin   MySQL Isis2d daemon plugin
mono ./isis2_daemon.exe   mono ./isis2_daemon.exe

As you can guess, it does not require much code in the .NET framework to start a socket server and listen to network requests. Nothing fancy: add code for parsing commands and you are done.


  [...]
  public void StartListener(string host, int port)
    {
        IPAddress localAddress = IPAddress.Parse(host);
        Socket listenSocket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
        IPEndPoint ipEndpoint = new IPEndPoint(localAddress, port);
        listenSocket.Bind(ipEndpoint);
        listenSocket.Listen(100 /* queue length */);
        listenSocket.BeginAccept(new AsyncCallback(ReceiveCallback), listenSocket);
        Console.WriteLine("[GroupDaemon] Server is waiting on socket {0}", listenSocket.LocalEndPoint);
    }
  [...]

You can now connect to the Isis2 client using telnet. Below is an example session with the join command that shall be used by MySQL to register itself in the distributed Isis2 group.


nixnutz@linux-dstv:~/src/isis_201245> telnet 127.0.0.1 2200 
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
join
2 Wrong number of parameters, syntax: join <host> <port> <mysqlStatus> <mysqlRole>
Connection closed by foreign host.
nixnutz@linux-dstv:~/src/isis_201245> telnet 127.0.0.1 2200 
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
join 127.0.0.1 3305 on master
0 OK
Connection closed by foreign host.
nixnutz@linux-dstv:~/src/isis_201245> telnet 127.0.0.1 2200 
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
serverlist
0 (62435) 127.0.0.1 3305 on master 30 08.08.2013 17:06:47
Connection closed by foreign host.

The Isis2 client that receives the join command, forwards it to everybody in the group using SafeSend(). The Isis2 manual discusses the messaging API in great detail.


   public void RegisterMySQLServer(string host, string port, string status, string role) {
        myGroup.SafeSend(UPDATE_MYSQL_REGISTER, IsisSystem.GetMyAddress().ToString(), host, port, status, role, DateTime.Now.ToString());
    }

All group members, including the sender, receive the message and add the MySQL server to their server list. The MySQL server is now known to everybody in the group. On whichever host you issue a serverlist command, the MySQL server is listed.

The MySQL server gets dropped from the list if it fails to send a heartbeat to its Isis2 client or sends a leave message. If it fails to send a heartbeat, MySQL might have crashes and appropriate action could be triggered. If the local Isis2 client dies, well, that’s a weak spot (self-set three days coding limit…). The PoC simplifies the case and assumes that the host crashed. All servers associated with the Isis2 client are dropped from the groups server list.

Please note, that all Isis2 members of an Isis2 group jointly decide whether a member has crashed or not. And, it takes more than a single missed heartbeat.

The MySQL plugins

Its nice to see the telnet sessions but it would be even better to have MySQL server plugins that send join, leave, heartbeat messages and parsed serverlist
into something more driver friendly. Nothing as easy as that: one MySQL server plugin for the membership and another one to export an INFORMATION_SCHEMA.ISIS2IS table.

A MySQL cluster that can report its state and take HA actions
  Host A  
  MySQL  
  MySQL Isis2d daemon plugin  
  MySQL Isis2is I_S plugin  
  mono ./isis2_daemon.exe  
Host B   Host C
MySQL   MySQL
MySQL Isis2d daemon plugin   MySQL Isis2d daemon plugin
MySQL Isis2is I_S plugin   MySQL Isis2is I_S plugin
mono ./isis2_daemon.exe   mono ./isis2_daemon.exe

Here you go with the plugin code. Again, its incomplete and even crashing code (UNINSTALL PLUGIN will crash). Its just good enough to make the point. There is not much to say about the plugin code that you cannot find discussed in greater depth in the MySQL manual. Its no more than a wrapper for the network commands shown before.


#include <my_global.h>
#include <sql_priv.h>
#include <stdlib.h>
#include <stdio.h>
#include <ctype.h>
#include <mysql_version.h>
#include <mysql/plugin.h>
#include <my_dir.h>
#include "my_pthread.h"                     
#include "my_sys.h"                       
#include "m_string.h"            
#include "sql_plugin.h"
#include "sql_class.h"
#include "table.h"

bool schema_table_store_record(THD *thd, TABLE *table);
/*
  Disable __attribute__() on non-gcc compilers.
 */
#if !defined(__attribute__) && !defined(__GNUC__)
#define __attribute__(A)
#endif


#include <sstream>
#include <string>
#include <list>
#include <ctime>

#include <netdb.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>

#include <cstdlib>

#include "isis2.h"

using namespace std;

/* Common/shard utility code, defined at the very end of the file */
static int isis2_common_simple_command(std::string cmd);
static int isis2_common_query(std::string msg, std::string * reply);
static int isis2_common_reply_is_error(std::string * reply);

class Isis2_MySQL_Server
{
public:
  std::string myerrno;
  std::string myerror;
  std::string isis2_member;
  std::string mysql_host;
  std::string mysql_port_or_socket;
  std::string mysql_status;
  std::string mysql_role;
  std::string mysql_heartbeat_ttl;
  std::string mysql_heartbeat_last;

  Isis2_MySQL_Server(std::string p_errno, std::string p_error)
  {
    myerrno = p_errno;
    myerror= p_error;
  }

  Isis2_MySQL_Server(std::string p_isis2_member, std::string p_mysql_host,
                     std::string p_mysql_port_or_socket, std::string p_mysql_status,
                     std::string p_mysql_role, std::string p_mysql_heartbeat_ttl,
                     std::string p_mysql_heartbeat_last)
  {
    myerrno = "";
    myerror = "";
    isis2_member= p_isis2_member;
    mysql_host= p_mysql_host;
    mysql_port_or_socket= p_mysql_port_or_socket;
    mysql_status= p_mysql_status;
    mysql_role= p_mysql_role;
    mysql_heartbeat_ttl= p_mysql_heartbeat_ttl;
    mysql_heartbeat_last= p_mysql_heartbeat_last;
  }
} ;

static std::list<Isis2_MySQL_Server*> cached_serverlist;
static time_t cached_serverlist_update = 0;
static mysql_mutex_t LOCK_cached_serverlist;
static PSI_mutex_key key_LOCK_cached_serverlist;
static int isis2d_plugin_installed = 0;

/* isis2 I_S Information schema plugin (show deamon state) */

static ST_FIELD_INFO isis2is_table_fields[]={

  {"ERROR", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"ERRNO", 6, MYSQL_TYPE_LONG, 0, MY_I_S_UNSIGNED, 0, 0},
  {"ISIS2_MEMBER", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_HOST", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_PORT_OR_SOCKET", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_STATUS", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_ROLE", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_HEARTBEAT_TTL", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {"MYSQL_HEARTBEAT_LAST", 255, MYSQL_TYPE_STRING, 0, 0, 0, 0},
  {0, 0, MYSQL_TYPE_NULL, 0, 0, 0, 0}
};

static int isis2is_fill_table(THD *thd, TABLE_LIST *tables, Item *cond)
{
  TABLE *table= tables->table;
  int myerrno = 0;
  char * line;
  std::string reply;
  std::string cmd = "serverlist";
  std::size_t found = std::string::npos;
  std::string line_copy;
  std::string p_isis2_member, p_mysql_host, p_mysql_port_or_socket, p_mysql_status,
          p_mysql_role, p_mysql_heartbeat_ttl, p_mysql_heartbeat_last;
  time_t now;
  std::list<Isis2_MySQL_Server*>::iterator it;

  fprintf(stderr, "Fill called\n");
  if (!isis2d_plugin_installed)
  {
    reply = "You must install the isis2d daemon plugin first.";
    myerrno = 1000;
    table->field[0]->store(reply.c_str(), reply.length(), system_charset_info);
    table->field[1]->store(myerrno);
    if (schema_table_store_record(thd, table))
      return 1;
    return 0;
  }
  time(&now);

  if ((cached_serverlist_update == 0) || (difftime(now, cached_serverlist_update) > 3))
  {
    time(&cached_serverlist_update);
    /* refresh server list every n seconds */
    mysql_mutex_lock(&LOCK_cached_serverlist);
    cached_serverlist.clear();
    if ((0 != isis2_common_query(cmd, &reply)) ||
        ((myerrno = isis2_common_reply_is_error(&reply)) > 0))
    {
      cached_serverlist.push_back(new Isis2_MySQL_Server(reply, "1000"));
    } else
    {
      line = strtok((char*) reply.c_str(), "\n");
      while (line != NULL)
      {
        line_copy = line;
        PARSE_ENTRY(line_copy, found, p_isis2_member);
        PARSE_ENTRY(line_copy, found, p_mysql_host);
        PARSE_ENTRY(line_copy, found, p_mysql_port_or_socket);
        PARSE_ENTRY(line_copy, found, p_mysql_status);
        PARSE_ENTRY(line_copy, found, p_mysql_role);
        PARSE_ENTRY(line_copy, found, p_mysql_heartbeat_ttl);
        cached_serverlist.push_back(new Isis2_MySQL_Server(
                                                           p_isis2_member, p_mysql_host,
                                                           p_mysql_port_or_socket, p_mysql_status,
                                                           p_mysql_role, p_mysql_heartbeat_ttl,
                                                           line_copy));
        line = strtok(NULL, "\n");
      }
      mysql_mutex_unlock(&LOCK_cached_serverlist);
    }
  }

  for (it = cached_serverlist.begin(); it != cached_serverlist.end(); ++it)
  {
    table->field[0]->store((*it)->myerror.c_str(), (*it)->myerror.length(), system_charset_info);
    table->field[1]->store((*it)->myerrno.c_str(), (*it)->myerrno.length(), system_charset_info);
    table->field[2]->store((*it)->isis2_member.c_str(), (*it)->isis2_member.length(), system_charset_info);
    table->field[3]->store((*it)->mysql_host.c_str(), (*it)->mysql_host.length(), system_charset_info);
    table->field[4]->store((*it)->mysql_port_or_socket.c_str(), (*it)->mysql_port_or_socket.length(), system_charset_info);
    table->field[5]->store((*it)->mysql_status.c_str(), (*it)->mysql_status.length(), system_charset_info);
    table->field[6]->store((*it)->mysql_role.c_str(), (*it)->mysql_role.length(), system_charset_info);
    table->field[7]->store((*it)->mysql_heartbeat_ttl.c_str(), (*it)->mysql_heartbeat_ttl.length(), system_charset_info);
    table->field[8]->store((*it)->mysql_heartbeat_last.c_str(), (*it)->mysql_heartbeat_last.length(), system_charset_info);
    if (schema_table_store_record(thd, table))
      return 1;
  }
  return 0;
}

static int isis2is_table_init(void *ptr)
{
  ST_SCHEMA_TABLE *schema_table= (ST_SCHEMA_TABLE*) ptr;

  schema_table->fields_info= isis2is_table_fields;
  schema_table->fill_table= isis2is_fill_table;
  return 0;
}

/* isis2 DAEMON plugin (heartbeat) */

static char * sysvar_isis2d_daemon_host;
static MYSQL_SYSVAR_STR(daemon_host,
                        sysvar_isis2d_daemon_host,
                        PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
                        "Isis2 connect daemon host. Can be set during startup only",
                        NULL,
                        NULL,
                        "127.0.0.1");

static unsigned int sysvar_isis2d_daemon_port;
static MYSQL_SYSVAR_UINT(daemon_port,
                         sysvar_isis2d_daemon_port,
                         PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
                         "Isis2 connect daemon port (1 to 65535). Can be set during startup only.",
                         NULL,
                         NULL,
                         2200,
                         0,
                         65535,
                         0);

static char * sysvar_isis2d_mysql_host;
static MYSQL_SYSVAR_STR(mysql_host,
                        sysvar_isis2d_mysql_host,
                        PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
                        "MySQL server host. Can be set during startup only.",
                        NULL,
                        NULL,
                        "127.0.0.1");

static unsigned int sysvar_isis2d_mysql_port;
static MYSQL_SYSVAR_UINT(mysql_port,
                         sysvar_isis2d_mysql_port,
                         PLUGIN_VAR_RQCMDARG | PLUGIN_VAR_READONLY,
                         "MySQL port (1 to 65535). Can be set during startup only.",
                         NULL,
                         NULL,
                         3400,
                         0,
                         65535,
                         0);

struct st_mysql_sys_var * isis2d_sysvars[] ={
  MYSQL_SYSVAR(daemon_host),
  MYSQL_SYSVAR(daemon_port),
  MYSQL_SYSVAR(mysql_host),
  MYSQL_SYSVAR(mysql_port),
  NULL
};

struct isis2d_heartbeat_context
{
  pthread_t heartbeat_thread;
} ;

pthread_handler_t isis2d_heartbeat(void *p)
{
  DBUG_ENTER("isis2d_heartbeat");
  std::stringstream isis_cmd;
  isis_cmd << "heartbeat " << sysvar_isis2d_mysql_host << " " << sysvar_isis2d_mysql_port << " on";

  while (1)
  {
    sleep(10);
    isis2_common_simple_command(isis_cmd.str());
  }
  DBUG_RETURN(0);
}

static int isis2d_plugin_init(void *p)
{
  DBUG_ENTER("isis2d_plugin_init");
  std::stringstream isis_cmd;
  pthread_attr_t attr;
  struct st_plugin_int *plugin = (struct st_plugin_int *) p;
  struct isis2d_heartbeat_context *con;
  con = (struct isis2d_heartbeat_context *)
          my_malloc(sizeof (struct isis2d_heartbeat_context) , MYF(0));

  /* Register MySQL server */
  isis_cmd << "join " << sysvar_isis2d_mysql_host << " " << sysvar_isis2d_mysql_port << " on master";
  isis2_common_simple_command(isis_cmd.str());

  mysql_mutex_init(key_LOCK_cached_serverlist, &LOCK_cached_serverlist, MY_MUTEX_INIT_FAST);
  isis2d_plugin_installed = 1;

  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  if (pthread_create(&con->heartbeat_thread, &attr, isis2d_heartbeat,
                     (void *) con) != 0)
  {
    fprintf(stderr, "Could not create heartbeat thread!\n");
    exit(0);
  }
  plugin->data = (void *) con;

  DBUG_RETURN(0);
}

static int isis2d_plugin_deinit(void *p)
{
  DBUG_ENTER("isis2d_plugin_deinit");
  struct st_plugin_int *plugin = (struct st_plugin_int *) p;
  struct isis2d_heartbeat_context *con =
          (struct isis2d_heartbeat_context *) plugin->data;
  void *dummy_retval;
  std::stringstream isis_cmd;

  mysql_mutex_destroy(&LOCK_cached_serverlist);

  pthread_cancel(con->heartbeat_thread);
  pthread_join(con->heartbeat_thread, &dummy_retval);

  isis_cmd << "leave " << sysvar_isis2d_mysql_host << " " << sysvar_isis2d_mysql_port;
  isis2_common_simple_command(isis_cmd.str());

  DBUG_RETURN(0);
}

static struct st_mysql_daemon isis2d_plugin ={MYSQL_DAEMON_INTERFACE_VERSION};
static struct st_mysql_information_schema isis2is_plugin ={ MYSQL_INFORMATION_SCHEMA_INTERFACE_VERSION };

mysql_declare_plugin(isis2)
{
  MYSQL_DAEMON_PLUGIN,
          &isis2d_plugin,
          "isis2d",
          "Ulf Wendel",
          "MySQL Isis2 Cloud Library Connector Daemon",
          PLUGIN_LICENSE_GPL,
          isis2d_plugin_init,
          isis2d_plugin_deinit,
          0x0100 ,
          NULL,
          isis2d_sysvars,
          NULL,
          0,
},
{
  MYSQL_INFORMATION_SCHEMA_PLUGIN,
          &isis2is_plugin,
          "isis2is",
          "Ulf Wendel",
          "MySQL Isis2 Cloud Library Connector I_S",
          PLUGIN_LICENSE_GPL,
          isis2is_table_init,
          NULL,
          0x0100,
          NULL,
          NULL,
          NULL,
          0
}

mysql_declare_plugin_end;

/* Common/shared utility functions*/
static int isis2_common_simple_command(std::string cmd)
{
  int myerrno;
  std::string reply;
  if ((0 != isis2_common_query(cmd, &reply)) ||
      ((myerrno = isis2_common_reply_is_error(&reply)) > 0))
  {
    std::stringstream msg;
    msg << cmd << " failed: '" << reply << "' (" << myerrno << ")";
    fprintf(stderr, msg.str().c_str());
    return myerrno;
  } else
  {
    fprintf(stderr, cmd.c_str());
    fprintf(stderr, "\n");
    return 0;
  }
}

static int isis2_common_query(std::string msg, std::string * reply)
{
  struct sockaddr_in address;
  struct in_addr inadr;
  struct hostent *host;
  int sockfd, n;
  char buf[51];

  if (inet_aton(sysvar_isis2d_daemon_host, &inadr))
  {
    host = gethostbyaddr((char*) &inadr, sizeof (inadr), AF_INET);
  } else
  {
    host = gethostbyname(sysvar_isis2d_daemon_host);
  }
  if (host == NULL)
  {
    fprintf(stderr, "Failed to resolve Isis2 daemon host name\n");
    return -1;
  }
  address.sin_family = AF_INET;
  address.sin_port = htons(sysvar_isis2d_daemon_port);
  memcpy(&address.sin_addr, host->h_addr_list[0], sizeof (address.sin_addr));

  if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) < 0)
  {
    fprintf(stderr, "socket() failed\n");
    return -1;
  }
  if (connect(sockfd, (struct sockaddr*) &address, sizeof (address) ))
  {
    fprintf(stderr, "connect() failed\n");
    return -1;
  }
  msg.append("\r\n");
  if ((unsigned int) write(sockfd, msg.c_str(), msg.length()) != msg.length())
  {
    fprintf(stderr, "write() failed\n");
    return -1;
  }
  while ((n = read(sockfd, buf, sizeof (buf))) > 0)
  {
    for (int i = 0; i < n; i++)
    {
      reply->push_back(buf[i]);
    }
  }
  close(sockfd);
  return 0;
}

static int isis2_common_reply_is_error(std::string * reply)
{
  int isis_errno = -1;
  std::size_t found = reply->find_first_of(" ");
  if (found != std::string::npos)
  {
    isis_errno = atoi(reply->substr(0, found).c_str());
    *reply = reply->substr(found + 1);
  }
  return isis_errno;
}


Loading the Plugins into MySQL

Once you load the plugins into MySQL, the MySQL server will try to register MySQL at its local Isis2 client using join. Every now and then, it sends a heartbeat. Upon server shutdown or plugin uninstallation, it sends leave.

At any time, on any MySQL server that has the plugins installed you can query the I_S for a list of all MySQL servers in the cluster.


mysql> SELECT * FROM INFORMATION_SCHEMA.ISIS2IS\G
*************************** 1. row ***************************
               ERROR: 
               ERRNO: 0
        ISIS2_MEMBER: (50249)
          MYSQL_HOST: 127.0.0.1
MYSQL_PORT_OR_SOCKET: 3400
        MYSQL_STATUS: on
          MYSQL_ROLE: master
 MYSQL_HEARTBEAT_TTL: 30
MYSQL_HEARTBEAT_LAST: 07.08.2013 13:50:22
1 row in set (0,01 sec)

A fictional user manual for the DBA

Here’s the user manual for the HA solution proposed. For every MySQL server do:

  1. Start Isis2 client: mono ./isis_deamon.exe
  2. Configure Connector Plugins, e.g. Isis2 client address
  3. Heartbeat to Isis2: INSTALL PLUGIN isis2d SONAME ‘libisis2.so’
  4. I_S Plugin: INSTALL PLUGIN isis2is SONAME ‘libisis2.so’
  5. Teach your clients to monitor INFORMATION_SCHEMA

Summary

Needless to say, I’m only scratching the surface. Nowhere in my code is a failover script being run or a MySQL server is reconfigured. I named some possible hooks for such actions. After years and years of extensive work by the MySQL community on how exactly to do the server reconfiguration, this was out of interest to me. The server side is mostly solved. Just think of the introduction of GTIDs in MySQL 5.6.

Whether you got for a HA setup with or without a single point of failure is a question of demand. For some, it may be perfectly tolerable. For those requiring five nines – check out MySQL Cluster NDB 7.3. Still, if a SPOF can be avoided, avoid it. A GCS embedded into plugin approach cannot cure the fact that a MySQL master is a SPOF but it can fix the monitoring SPOF. And, if it was combined with DRBD a standby master could be used – if, … if you really want it.

  Host A  
  MySQL Master (SPOF)  
  MySQL Isis2d daemon plugin (dump Isis2 connector)  
  MySQL Isis2is I_S plugin (dump Isis2 connector)  
  mono ./isis2_daemon.exe (many of them)  
Host B   Host C
MySQL Slave (many of them)   MySQL Slave
MySQL Isis2d daemon plugin   MySQL Isis2d daemon plugin
MySQL Isis2is I_S plugin   MySQL Isis2is I_S plugin
mono ./isis2_daemon.exe   mono ./isis2_daemon.exe

As a Connectors guy, I’m not happy with the current status. All to often I have to reply “manual client deployment” when someone asks me about growing or shrinking a MySQL Replication cluster. Even for failover virtual IP transfer may not always do the trick. Then again its “manual client deployment”.

Deployment is annoying and time consuming. People go for centralized proxies as this means less to deploy. Centralized proxies can become bottlenecks, single points of failures and so forth. If only clients could learn about changes in the cluster topology, then continiously self-configuring client-side load balancers became possible. However, I have the impressing that todays HA solutions care little about the client side. Weird: a server would be pointless without drivers and clients. If you keep clients low, you keep the ease-of-use low at the end of the day.

Happy hacking!

@Ulf_Wendel Follow me on Twitter