Mark McGranaghan

Getting Started with Zookeeper and Go

May 26 2014

Here we’ll see how to use the Zookeper distributed coordination system from Go. We’ll set up a Zookeeper and Go environment using Vagrant, show how to perform common Zookeeper operations from the native Go client, and discuss some basic Zookeeper principles along the way.

Zookeeper and Go Environment

For the Go client examples described below, we need a multi-node Zookeeper ensemble (cluster) and a working Go environment. We’ll set this up using Vagrant, a tool for repeatably building development environments.

Download a recent release of Vagrant if you haven’t already.

Here’s our Vagrant config that defines our Zookeeper and Go environment. Put this in a Vagrantfile:

VAGRANTFILE_API_VERSION = "2"

Vagrant.configure(VAGRANTFILE_API_VERSION) do |config|
  ["1", "2", "3"].each do |n|
    config.vm.define "zk#{n}" do |zk_config|
      zk_config.vm.box = "hashicorp/precise64"
      zk_config.vm.network "private_network", :ip => "192.168.12.1#{n}"
      zk_config.vm.provision :shell, :path => "vm-zookeeper.sh", :args => n
    end
  end

  config.vm.define "go" do |go_config|
    go_config.vm.box = "hashicorp/precise64"
    go_config.vm.network "private_network", :ip => "129.168.12.10"
    go_config.vm.provision :shell, :path => "vm-go.sh"
  end
end

This config first defines 3 Zookeeper VMs (zk{1,2,3}) in which we’ll run our 3-member Zookeeper ensemble. Each VM will be based on the standard hashicorp/precise64 base box, providing an Ubuntu 12.04 machine. We’ll further configure each of these VMs with a vm-zookeeper.sh script:

#!/usr/bin/env bash

apt-get update
apt-get install -y openjdk-7-jdk
cd /opt
wget -q http://apache.mirrors.pair.com/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar -xzf zookeeper-3.4.6.tar.gz

MYID=$1
mkdir -p /var/zookeeper/{data,conf}
echo -n $MYID > /var/zookeeper/data/myid
cat > /var/zookeeper/conf/zoo.cfg <<EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/var/zookeeper/data
clientPort=2181
server.1=192.168.12.11:2888:3888
server.2=192.168.12.12:2888:3888
server.3=192.168.12.13:2888:3888
EOF

cat > /etc/init/zookeeper.conf <<EOF
exec /opt/zookeeper-3.4.6/bin/zkServer.sh start-foreground /var/zookeeper/conf/zoo.cfg
EOF
start zookeeper

This script installs Java and a stable release of Zookeeper. We configure Zookeeper with some standard settings in zoo.cfg and set up an Upstart script so that we can easily start and stop the service. Finally we call start zookeeper so that the service is running when we bring up our VMs.

The above will give us a Zookeeper ensemble running at 192.168.12.1{1,2,3}:2181. Now we need an Go an environment to try it out from. The final VM (go) described in our Vagrantfile will serve this purpose. Here’s the vm-go.sh script for configuring it:

#!/usr/bin/env bash

apt-get update
apt-get install -y build-essential git-core mercurial
cd /opt
hg clone -u release https://code.google.com/p/go
cd go/src
./all.bash

cat > /home/vagrant/.profile <<EOF
export GOPATH=\$HOME
export PATH=\$HOME/bin:/opt/go/bin:\$PATH
export ZOOKEEPER_SERVERS=192.168.12.11:2181,192.168.12.12:2181,192.168.12.13:2181
EOF
chown vagrant:vagrant /home/vagrant/.profile

sudo -u vagrant -i go get github.com/samuel/go-zookeeper/zk
sudo -u vagrant -i go get github.com/mmcgrana/zk

Here we install Go and its pre-reqs, set up the default vagrant user’s environment to reference this Go install, and provide a ZOOKEPER_SERVERS environment variable pointing at our Zookeeper ensemble. We also install the native Go Zookeeper library and Go-based Zookeeper CLI that we’ll use for our experiments.

Either copy the files above or clone them from this GitHub repo. Then bring up your Vagrant environment with:

$ vagrant up

This will take some time to run. When it’s done, you should be ready to try out Zookeeper and Go locally with a complete environment.

Testing Out the Environment

To test out the environment, first SSH into the Go VM:

$ vagrant ssh go

And try contacting the Zookeeper ensemble using the zk CLI:

$ zk children /
zookeeper

You should see a one-line output of zookeeper from the above command. If so, everything worked.

Basic Operations

Now we’ll look at using the go-zookeeper Go package from our own code. Here’s some preamble code that we’ll use in all of our examples, including this first one:

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"os"
	"strings"
	"time"
)

func must(err error) {
	if err != nil {
		panic(err)
	}
}

func connect() *zk.Conn {
	zksStr := os.Getenv("ZOOKEEPER_SERVERS")
	zks := strings.Split(zksStr, ",")
	conn, _, err := zk.Connect(zks, time.Second)
	must(err)
	return conn
}

This code imports the packages we’ll use, defines a must helper that we can use to check for runtime errors in our programs, and finally a connect function that returns Zookeeper connection for the ensemble defined by ZOOKEEPER_SERVERS.

Here’s the rest of our first example file:

func main() {
	conn := connect()
	defer conn.Close()

	flags := int32(0)
	acl := zk.WorldACL(zk.PermAll)

	path, err := conn.Create("/01", []byte("data"), flags, acl)
	must(err)
	fmt.Printf("create: %+v\n", path)

	data, stat, err := conn.Get("/01")
	must(err)
	fmt.Printf("get:    %+v %+v\n", string(data), stat)

	stat, err = conn.Set("/01", []byte("newdata"), stat.Version)
	must(err)
	fmt.Printf("set:    %+v\n", stat)

	err = conn.Delete("/01", -1)
	must(err)
	fmt.Printf("delete: ok\n")

	exists, stat, err := conn.Exists("/01")
	must(err)
	fmt.Printf("exists: %+v %+v\n", exists, stat)
}

Here we see basic CRUD operations against znodes in Zookeeper: creating a znode with initial data, getting znode data, setting new znode data, deleting a znode, and checking for the existence of a znode.

You can run this code by entering your Go VM with vagrant ssh go, putting the source (the preamble above plus this main function) in e.g. ex-crud.go, and then running:

$ go run ex-crud.go

Note that keys are strings, but data (values) are bytes. Use []byte(str) to convert strings to bytes for data arguments.

Znodes are versioned in Zookeeper, which allows us to make atomic changes to their data based on previous data. This means that in order to use Set or Delete, we need to either provide the previous Version of the znode or a version of -1 to indicate that we’re willing to clobber whatever value might be there. The stat return value of e.g. Get will give this Version, which you can then supply to subsequent operations.

The Create call takes arguments for flags and acl. We won’t cover ACLs in this post, but using zk.WorldACL(zk.PermAll) will work for all of our examples. For flags we’ll see a different option later.

In our next example we’ll look at hierarchical data in Zookeeper.

Managing Hierarchical Data

Zookeeper’s data model is hierarchical, much like a file system. A znode can have children, and levels in the hierarchy are separated by a /.

Here’s an example of managing hierarchical data from our Go client. Note that we’ll need the preamble code mentioned above for this example as well:

func main() {
	conn := connect()
	defer conn.Close()

	flags := int32(0)
	acl := zk.WorldACL(zk.PermAll)

	_, err := conn.Create("/dir", []byte("data-parent"), flags, acl)
	must(err)

	for i := 1; i <= 3; i++ {
		key := fmt.Sprintf("/dir/key%d", i)
		data := []byte(fmt.Sprintf("data-child-%d", i))
		path, err := conn.Create(key, data, flags, acl)
		must(err)
		fmt.Printf("%+v\n", path)
	}

	data, _, err := conn.Get("/dir")
	fmt.Printf("/dir: %s\n", string(data))

	children, _, err := conn.Children("/dir")
	must(err)
	for _, name := range children {
		data, _, err := conn.Get("/dir/"+name)
		must(err)
		fmt.Printf("/dir/%s: %s\n", name, string(data))
		err = conn.Delete("/dir/" + name, 0)
	}

	err = conn.Delete("/dir", 0)
	must(err)
}

In the first half of this example we create a parent znode and some child znodes, each with data. Note that child znodes are at the path parent-path + "/" + child-name.

We can use the Children method on the Zookeeper connection to enumerate the children of a given znode. In our example we use this to see all the children that we’ve created and then show their contents.

Note that in Zookeeper a znode with children can also have its own data. This is unlike e.g. Unix directories which don’t have contents as Unix files do.

In the second half of the example we delete our znodes, starting with the child znodes and then finally deleting the parent node.

So far in our examples we’ve only used persistent znodes, which are durable until they’re explicitly removed, even if a client disconnects. In our next example we’ll look at ephemeral znodes.

Ephemeral Znodes

Zookeeper has an ephemeral znodes feature which allows clients to create znodes that remain only as long as the client is connected to the Zookeeper ensemble. This allows systems using Zookeeper to track the liveness of servers (and the clients connecting from them) as servers come up and down.

Here’s some example code that demonstrates the ephemeral znode feature:

func main() {
	conn1 := connect()

	flags := int32(zk.FlagEphemeral)
	acl := zk.WorldACL(zk.PermAll)

	_, err := conn1.Create("/ephemeral", []byte("here"), flags, acl)
	must(err)

	conn2 := connect()
	defer conn2.Close()

	exists, _, err := conn2.Exists("/ephemeral")
	must(err)
	fmt.Printf("before disconnect: %+v\n", exists)

	conn1.Close()
	time.Sleep(time.Second * 2)

	exists, _, err = conn2.Exists("/ephemeral")
	must(err)
	fmt.Printf("after disconnect: %+v\n", exists)
}

In this example conn1 creates an ephemeral znode using the zk.FlagEphemeral constant. conn2 then checks for the existence of this znode. While conn1 is still open, this Exists call from conn2 returns true. But after we close conn1, the same Exists call from conn2 returns false, indicating as expected that conn1 is no longer active and the ephemeral znode that it created has vanished.

We can observe changes in znodes and node children sets by polling (as in this example), but Zookeeper also provides a more efficient mechanism for observing changes. We’ll look at this in the next example.

Watches

Zookeeper provides a watches feature with which clients can receive notifications of changes to znodes: changes to their existence, to their data, or to their set of children.

Clients can use watches to quickly respond to changes in configuration data stored in Zookeeper.

Here’s an example:

func main() {
	conn1 := connect()
	defer conn1.Close()

	flags := int32(zk.FlagEphemeral)
	acl := zk.WorldACL(zk.PermAll)

	found, _, ech, err := conn1.ExistsW("/watch")
	must(err)
	fmt.Printf("found: %t\n", found)

	conn2 := connect()
	must(err)

	go func() {
		time.Sleep(time.Second * 3)
		fmt.Println("creating znode")
		_, err = conn2.Create("/watch", []byte("here"), flags, acl)
		must(err)
	}()

	evt := <-ech
	fmt.Println("watch fired")
	must(evt.Err)

	found, _, err = conn1.Exists("/watch")
	must(err)
	fmt.Printf("found: %t\n", found)
}

Running the code produces:

$ go run /vagrant/ex-watch.go
found: false
creating znode
watch fired
found: true

To start this example, conn1 checks for the existence of a znode at /watch, which we see doesn’t exist yet. By using the ExistsW variant instead of the standard Exists method, we get an additional return value: the ech (event channel) which will communicate events on our watched znode.

We then start a second goroutine with conn2 that will create a znode at our watched path in a few seconds. The evt := <- ech receive for conn1 blocks during this time. When conn2 creates the znode, the watch fires and this receive succeeds. We check to make sure that we didn’t receive an error event and, assuming we didn’t, then query the existence of the /watch znode. This will confirm that it does indeed now exist.

In this example we only watched for changes in the presence of a znode, and only observed a single change. Watches can also be composed and repeated to monitor more complex changes in Zookeeper state. We’ll see an example of this below.

Continuos Watches to Mirror State

A common problem in system configuration is keeping an up-to-date record of e.g. which servers are online. We can use Zookeeper to solve this problem by having each server register as a child of a known znode when it’s alive. We can furthermore take advantage of Zookeeper’s watch functionality to receive notifications whenever this set of children changes. By continuously watching the known znode, we can effectively mirror a subset of Zookeeper state in our client process.

Here’s an example of how this might work. To start, we’ll define a function mirror that will use a given *zk.Conn to continuously watch children at a specified path , emitting to a channel a snapshot of the set of children whenever it detects a change:

func mirror(conn *zk.Conn, path string) (chan []string, chan error) {
	snapshots := make(chan []string)
	errors := make(chan error)
	go func() {
		for {
			snapshot, _, events, err := conn.ChildrenW(path)
			if err != nil {
				errors <- err
				return
			}
			snapshots <- snapshot
			evt := <-events
			if evt.Err != nil {
				errors <- evt.Err
				return
			}
		}
	}()
	return snapshots, errors
}

The basic flow of this code is to enter a loop in which we get the current set of children, watch for a change notification on that set of children, and then either restart the loop to get the new set of children or emit an error if appropriate.

Here’s some code demonstrating this functionality:

func main() {
	conn1 := connect()
	defer conn1.Close()

	flags := int32(zk.FlagEphemeral)
	acl := zk.WorldACL(zk.PermAll)

	snapshots, errors := mirror(conn1, "/mirror")
	go func() {
		for {
			select {
			case snapshot := <-snapshots:
				fmt.Printf("%+v\n", snapshot)
			case err := <-errors:
				panic(err)
			}
		}
	}()

	conn2 := connect()
	time.Sleep(time.Second)

	_, err := conn2.Create("/mirror/one", []byte("one"), flags, acl)
	must(err)
	time.Sleep(time.Second)

	_, err = conn2.Create("/mirror/two", []byte("two"), flags, acl)
	must(err)
	time.Sleep(time.Second)

	err = conn2.Delete("/mirror/two", 0)
	must(err)
	time.Sleep(time.Second)

	_, err = conn2.Create("/mirror/three", []byte("three"), flags, acl)
	must(err)
	time.Sleep(time.Second)

	conn2.Close()
	time.Sleep(time.Second)
}

Run the code like this, being sure to create the persistent /mirror znode before starting the example:

$ echo -n "" | zk create /mirror
$ go run ex-mirror.go
[]
[one]
[two one]
[one]
[three one]
[]

Each second, we receive a new snapshot in the goroutine that we created with conn1 to mirror the set of children at /mirror. These snapshots reflect the series of changes we make from conn2, for example adding a new child with name one then two, deleting two, etc. Finally the snapshot becomes empty when we close conn2 and all the znodes vanish, as the znodes were configured to be ephemeral.

Conclusion

Here we’ve shown how to create a basic Zookeeper + Go environment and how to perform various Zookeeper operations from a Go client. We hope you’ve found this information useful and give it a try yourself.

Zookeeper is a sophisticated system that we’ve only begun to explore in this post. If you’re interested in learning more, we suggest the Zookeeper website and this Zookeeper book.

Note that Zookeeper was originally designed for use only by C and Java clients. Before diving in with Go in production, we suggest checking out at least the Java client and the Curator extensions built on top of it.

All code in this post is available in this GitHub repo fur further reference.

We welcome feedback on this article. Please contact the author with your ideas and suggestions.