summaryrefslogtreecommitdiff
path: root/workers
diff options
context:
space:
mode:
authorAleksander Mistewicz <a.mistewicz@samsung.com>2018-05-29 11:20:44 +0200
committerAleksander Mistewicz <a.mistewicz@samsung.com>2018-08-03 13:36:38 +0200
commit1a9e53461bbde08e3c2235d07c2c89d8731d16a7 (patch)
tree7343bebc11299474986d83d9c467efedb5fb5953 /workers
parent0ca27786ce5a62e0be92112c066044022a657cd3 (diff)
downloadboruta-1a9e53461bbde08e3c2235d07c2c89d8731d16a7.tar.gz
boruta-1a9e53461bbde08e3c2235d07c2c89d8731d16a7.tar.bz2
boruta-1a9e53461bbde08e3c2235d07c2c89d8731d16a7.zip
Add dryadAddress and sshAddress to Register
When running multiple dryads on a single host or behind NAT, they must listen on different ports and inform boruta about this fact. Change-Id: I35e084b8ee2e2177d36055f7dedacb53ac74bbf0 Signed-off-by: Aleksander Mistewicz <a.mistewicz@samsung.com>
Diffstat (limited to 'workers')
-rw-r--r--workers/worker_list_test.go38
-rw-r--r--workers/workers.go44
-rw-r--r--workers/workers_test.go2
3 files changed, 61 insertions, 23 deletions
diff --git a/workers/worker_list_test.go b/workers/worker_list_test.go
index b3b0ddd..1c85098 100644
--- a/workers/worker_list_test.go
+++ b/workers/worker_list_test.go
@@ -36,6 +36,14 @@ import (
var _ = Describe("WorkerList", func() {
var wl *WorkerList
+ dryadAddr := net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 7175,
+ }
+ sshdAddr := net.TCPAddr{
+ IP: net.IPv4(127, 0, 0, 1),
+ Port: 22,
+ }
BeforeEach(func() {
wl = NewWorkerList()
})
@@ -81,7 +89,7 @@ var _ = Describe("WorkerList", func() {
}
It("should fail if UUID is not present", func() {
- err := wl.Register(nil)
+ err := wl.Register(nil, "", "")
Expect(err).To(Equal(ErrMissingUUID))
})
@@ -93,7 +101,7 @@ var _ = Describe("WorkerList", func() {
It("should add Worker in MAINTENANCE state", func() {
caps := getRandomCaps()
- err := wl.Register(caps)
+ err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
uuid := WorkerUUID(caps[UUID])
wl.mutex.RLock()
@@ -110,14 +118,14 @@ var _ = Describe("WorkerList", func() {
caps := getRandomCaps()
By("registering worker")
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps[UUID])
compareLists()
By("updating the caps")
caps["test-key"] = "test-value"
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
wl.mutex.RLock()
Expect(wl.workers[WorkerUUID(caps[UUID])].Caps).To(Equal(caps))
@@ -133,7 +141,7 @@ var _ = Describe("WorkerList", func() {
caps := getRandomCaps()
By("registering first worker")
- err = wl.Register(caps)
+ err = wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps[UUID])
compareLists()
@@ -148,13 +156,13 @@ var _ = Describe("WorkerList", func() {
caps2 := getRandomCaps()
By("registering first worker")
- err = wl.Register(caps1)
+ err = wl.Register(caps1, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps1[UUID])
compareLists()
By("registering second worker")
- err = wl.Register(caps2)
+ err = wl.Register(caps2, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
registeredWorkers = append(registeredWorkers, caps2[UUID])
compareLists()
@@ -172,13 +180,13 @@ var _ = Describe("WorkerList", func() {
return newUUID
}
registerWorker := func() WorkerUUID {
- capsUUID := getUUID()
- err := wl.Register(Capabilities{UUID: capsUUID})
+ capsUUID := randomUUID()
+ err := wl.Register(Capabilities{UUID: string(capsUUID)}, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
wl.mutex.RLock()
Expect(wl.workers).ToNot(BeEmpty())
wl.mutex.RUnlock()
- return WorkerUUID(capsUUID)
+ return capsUUID
}
BeforeEach(func() {
@@ -343,7 +351,8 @@ var _ = Describe("WorkerList", func() {
info, ok = wl.workers[worker]
Expect(ok).To(BeTrue())
Expect(info.key).To(BeNil())
- info.ip = ip
+ info.dryad = new(net.TCPAddr)
+ info.dryad.IP = ip
wl.mutex.Unlock()
})
AfterEach(func() {
@@ -528,7 +537,7 @@ var _ = Describe("WorkerList", func() {
registerAndSetGroups := func(groups Groups, caps Capabilities) WorkerInfo {
capsUUID := getUUID()
caps[UUID] = capsUUID
- err := wl.Register(caps)
+ err := wl.Register(caps, dryadAddr.String(), sshdAddr.String())
Expect(err).ToNot(HaveOccurred())
workerID := WorkerUUID(capsUUID)
@@ -818,7 +827,8 @@ var _ = Describe("WorkerList", func() {
info, ok = wl.workers[worker]
Expect(ok).To(BeTrue())
Expect(info.key).To(BeNil())
- info.ip = ip
+ info.dryad = new(net.TCPAddr)
+ info.dryad.IP = ip
})
It("should set worker into IDLE state and prepare a key", func() {
gomock.InOrder(
@@ -930,7 +940,7 @@ var _ = Describe("WorkerList", func() {
workerUUID := WorkerUUID(capsUUID)
caps[UUID] = capsUUID
- wl.Register(caps)
+ wl.Register(caps, dryadAddr.String(), sshdAddr.String())
wl.mutex.RLock()
w, ok := wl.workers[workerUUID]
wl.mutex.RUnlock()
diff --git a/workers/workers.go b/workers/workers.go
index 4771226..6e3dd7c 100644
--- a/workers/workers.go
+++ b/workers/workers.go
@@ -19,6 +19,7 @@ package workers
import (
"crypto/rsa"
+ "errors"
"math"
"net"
"sync"
@@ -35,8 +36,9 @@ const UUID string = "UUID"
// (public and private) structures representing Worker.
type mapWorker struct {
WorkerInfo
- ip net.IP
- key *rsa.PrivateKey
+ dryad *net.TCPAddr
+ sshd *net.TCPAddr
+ key *rsa.PrivateKey
}
// WorkerList implements Superviser and Workers interfaces.
@@ -74,26 +76,51 @@ func NewWorkerList() *WorkerList {
}
// Register is an implementation of Register from Superviser interface.
-// UUID, which identifies Worker, must be present in caps.
-func (wl *WorkerList) Register(caps Capabilities) error {
+// UUID, which identifies Worker, must be present in caps. Both dryadAddress and
+// sshAddress must resolve and parse to net.TCPAddr. Neither IP address nor port number
+// can not be ommited.
+func (wl *WorkerList) Register(caps Capabilities, dryadAddress string, sshAddress string) error {
capsUUID, present := caps[UUID]
if !present {
return ErrMissingUUID
}
uuid := WorkerUUID(capsUUID)
+
+ dryad, err := net.ResolveTCPAddr("tcp", dryadAddress)
+ if err != nil {
+ return err
+ }
+ // dryad.IP is empty if dryadAddress provided port number only.
+ if dryad.IP == nil {
+ return errors.New("missing IP in dryad address")
+ }
+ sshd, err := net.ResolveTCPAddr("tcp", sshAddress)
+ if err != nil {
+ return err
+ }
+ // same as with dryad.IP
+ if sshd.IP == nil {
+ return errors.New("missing IP in ssh address")
+ }
+
wl.mutex.Lock()
defer wl.mutex.Unlock()
worker, registered := wl.workers[uuid]
if registered {
- // Subsequent Register calls update the caps.
+ // Subsequent Register calls update the caps and addresses.
worker.Caps = caps
+ worker.dryad = dryad
+ worker.sshd = sshd
} else {
wl.workers[uuid] = &mapWorker{
WorkerInfo: WorkerInfo{
WorkerUUID: uuid,
State: MAINTENANCE,
Caps: caps,
- }}
+ },
+ dryad: dryad,
+ sshd: sshd,
+ }
}
return nil
}
@@ -266,7 +293,8 @@ func (wl *WorkerList) SetWorkerIP(uuid WorkerUUID, ip net.IP) error {
if !ok {
return ErrWorkerNotFound
}
- worker.ip = ip
+ // FIXME
+ worker.dryad.IP = ip
return nil
}
@@ -278,7 +306,7 @@ func (wl *WorkerList) GetWorkerIP(uuid WorkerUUID) (net.IP, error) {
if !ok {
return nil, ErrWorkerNotFound
}
- return worker.ip, nil
+ return worker.dryad.IP, nil
}
// SetWorkerKey stores private key in the worker structure referenced by uuid.
diff --git a/workers/workers_test.go b/workers/workers_test.go
index 3a49000..43f25da 100644
--- a/workers/workers_test.go
+++ b/workers/workers_test.go
@@ -57,7 +57,7 @@ var _ = Describe("WorkerList", func() {
}
b.Time("register", func() {
for i := 0; i < maximumWorkers; i++ {
- err := wl.Register(caps[i])
+ err := wl.Register(caps[i], "127.0.0.1:7175", "127.0.0.1:22")
Expect(err).ToNot(HaveOccurred())
}
})