summaryrefslogtreecommitdiff
path: root/requests
diff options
context:
space:
mode:
authorMaciej Wereski <m.wereski@partner.samsung.com>2017-07-07 12:54:40 +0200
committerMaciej Wereski <m.wereski@partner.samsung.com>2017-10-12 18:43:41 +0200
commit554ddf2e6941f850678f8273dfe5788e593c0ea7 (patch)
treedf85700706661923ed8ee94d97c9d59f14058fe6 /requests
parent820f58d66aeadbbe06ffc88519c0e5e4ab983766 (diff)
downloadboruta-554ddf2e6941f850678f8273dfe5788e593c0ea7.tar.gz
boruta-554ddf2e6941f850678f8273dfe5788e593c0ea7.tar.bz2
boruta-554ddf2e6941f850678f8273dfe5788e593c0ea7.zip
Initial Request Queue implementation
Package requests provides Requests and ListFilter interfaces implementation. Users of this package can create new Boruta requests, list them (with filter), get detail information about them or modify it. What still needs to be done is interaction with worker mostly. This needs working implementations of Workers interface and appropriate matcher. Change-Id: Ia4458f1959f7e52fdbe1df5f89c86785f5db3e70 Signed-off-by: Maciej Wereski <m.wereski@partner.samsung.com> Reviewed-on: https://mcdsrvbld02.digital.local/review/49522 Reviewed-by: Aleksander Mistewicz <a.mistewicz@samsung.com> Tested-by: Aleksander Mistewicz <a.mistewicz@samsung.com> Reviewed-by: Pawel Wieczorek <p.wieczorek2@samsung.com>
Diffstat (limited to 'requests')
-rw-r--r--requests/errors.go38
-rw-r--r--requests/queue.go159
-rw-r--r--requests/queue_test.go92
-rw-r--r--requests/requests.go238
-rw-r--r--requests/requests_test.go484
5 files changed, 1011 insertions, 0 deletions
diff --git a/requests/errors.go b/requests/errors.go
new file mode 100644
index 0000000..75f830b
--- /dev/null
+++ b/requests/errors.go
@@ -0,0 +1,38 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File requests/errors.go provides errors that may occur when handling requests.
+
+package requests
+
+import "errors"
+
+var (
+ // ErrPriority means that requested priority is out of bounds.
+ // Either general bounds (HiPrio, LoPrio) or allowed priority bounds for
+ // request owner.
+ ErrPriority = errors.New("requested priority out of bounds")
+ // ErrInvalidTimeRange means that requested ValidAfter date is after Deadline value.
+ ErrInvalidTimeRange = errors.New("requested time range illegal - ValidAfter must be before Deadline")
+ // ErrDeadlineInThePast means that requested Deadline date is in the past.
+ ErrDeadlineInThePast = errors.New("Deadline in the past")
+ // ErrWorkerNotAssigned means that user tries to call method which regards
+ // worker although worker wasn't assigned yet.
+ ErrWorkerNotAssigned = errors.New("worker not assigned")
+ // ErrModificationForbidden means that user tries to modify request which
+ // is in state that forbids any changes.
+ ErrModificationForbidden = errors.New("action cannot be executed in current state")
+)
diff --git a/requests/queue.go b/requests/queue.go
new file mode 100644
index 0000000..f91663b
--- /dev/null
+++ b/requests/queue.go
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File requests/queue.go file contains implementation of Priority Queue for
+// requests. It's done as an array of regular FIFO queues - one per priority.
+
+package requests
+
+import (
+ "container/list"
+ "sync"
+
+ . "git.tizen.org/tools/boruta"
+)
+
+// prioQueue is priority queue that stores request IDs.
+// Following part of interface should be used:
+// - pushRequest()
+// - removeRequest()
+// - setRequestPriority()
+// - initIterator()
+// - releaseIterator()
+// - next()
+type prioQueue struct {
+ queue []*list.List
+ length uint
+ // next returns ID of next request in the priority queue and bool which
+ // indicates if ID was found. False means that caller has iterated through
+ // all elements and pq.releaseIterator() followed by pq.initIterator()
+ // must be called in order to have a working iterator again.
+ next func() (ReqID, bool)
+ mtx *sync.Mutex
+}
+
+// _emptyIterator is helper function which always returns values which indicate
+// that iterator should be initialized. It is desired to be set as next member of
+// prioQueue structure whenever iterator needs initialization.
+func _emptyIterator() (ReqID, bool) { return ReqID(0), false }
+
+// newPrioQueue returns pointer to newly created and initialized priority queue.
+func newPrioQueue() *prioQueue {
+ pq := new(prioQueue)
+
+ // Prepare queues.
+ pq.queue = make([]*list.List, LoPrio+1)
+ for i := HiPrio; i <= LoPrio; i++ {
+ pq.queue[i] = new(list.List).Init()
+ }
+ pq.length = 0
+ pq.mtx = new(sync.Mutex)
+
+ // Prepare iterator.
+ pq.next = _emptyIterator
+
+ return pq
+}
+
+// _remove removes request with given reqID from the queue. Caller must be sure
+// that request with given ID exists in the queue otherwise function will panic.
+// It's more convenient to use removeRequest().
+func (pq *prioQueue) _remove(reqID ReqID, priority Priority) {
+ for e := pq.queue[priority].Front(); e != nil; e = e.Next() {
+ if e.Value.(ReqID) == reqID {
+ pq.length--
+ pq.queue[priority].Remove(e)
+ return
+ }
+ }
+ panic("request with given reqID doesn't exist in the queue")
+}
+
+// removeRequest removes request from the priority queue. It wraps _remove(),
+// which will panic if request is missing from the queue and removeRequest will
+// propagate this panic.
+func (pq *prioQueue) removeRequest(req *ReqInfo) {
+ pq.mtx.Lock()
+ defer pq.mtx.Unlock()
+ pq._remove(req.ID, req.Priority)
+}
+
+// _push adds request ID at the end of priority queue. It's more convenient to use
+// pushRequest().
+func (pq *prioQueue) _push(reqID ReqID, priority Priority) {
+ pq.queue[priority].PushBack(reqID)
+ pq.length++
+}
+
+// pushRequest adds request to priority queue. It wraps _push().
+func (pq *prioQueue) pushRequest(req *ReqInfo) {
+ pq.mtx.Lock()
+ defer pq.mtx.Unlock()
+ pq._push(req.ID, req.Priority)
+}
+
+// setRequestPriority modifies priority of request that was already added to the
+// queue. Caller must make sure that request with given ID exists in the queue.
+// Panic will occur if such ID doesn't exist.
+func (pq *prioQueue) setRequestPriority(req *ReqInfo, newPrio Priority) {
+ pq.mtx.Lock()
+ defer pq.mtx.Unlock()
+ pq._remove(req.ID, req.Priority)
+ pq._push(req.ID, newPrio)
+}
+
+// initIterator initializes iterator. Caller must call it before first call to pq.next().
+// If caller wants to iterate once again through the queue (e.g. after pq.next()
+// returns (0, false)), then (s)he must call pq.releaseIterator and initIterator()
+// once again.
+func (pq *prioQueue) initIterator() {
+ pq.mtx.Lock()
+ // current priority
+ p := HiPrio
+ // current element of list for p priority
+ e := pq.queue[p].Front()
+
+ pq.next = func() (id ReqID, ok bool) {
+
+ // The queue is empty.
+ if pq.length == 0 {
+ p = HiPrio
+ e = nil
+ return ReqID(0), false
+ }
+
+ if e == nil {
+ // Find next priority.
+ for p++; p <= LoPrio && pq.queue[p].Len() == 0; p++ {
+ }
+ if p > LoPrio {
+ return ReqID(0), false
+ }
+ // Get it's first element.
+ e = pq.queue[p].Front()
+ }
+
+ id, ok = e.Value.(ReqID), true
+ e = e.Next()
+ return
+ }
+}
+
+// releaseIterator must be called after user finishes iterating through the queue.
+func (pq *prioQueue) releaseIterator() {
+ pq.next = _emptyIterator
+ pq.mtx.Unlock()
+}
diff --git a/requests/queue_test.go b/requests/queue_test.go
new file mode 100644
index 0000000..2359d5a
--- /dev/null
+++ b/requests/queue_test.go
@@ -0,0 +1,92 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// File requests/queue_test.go contains additional tests for queue.go. Please
+// take a look at requests_test.go for initTest() and requestsTests definition.
+
+package requests
+
+import (
+ "testing"
+
+ . "git.tizen.org/tools/boruta"
+)
+
+func TestRemovePanic(t *testing.T) {
+ assert, rqueue := initTest(t)
+ assert.Panics(func() { rqueue.queue._remove(ReqID(1), LoPrio) })
+}
+
+func TestQueue(t *testing.T) {
+ assert, rqueue := initTest(t)
+ var reqs = []struct {
+ id ReqID
+ pr Priority
+ }{
+ {ReqID(1), Priority(7)},
+ {ReqID(2), Priority(0)},
+ {ReqID(3), Priority(1)},
+ {ReqID(4), Priority(12)},
+ {ReqID(5), Priority(2)},
+ {ReqID(6), Priority(2)},
+ }
+ sorted := []ReqID{ReqID(2), ReqID(3), ReqID(5), ReqID(6), ReqID(1), ReqID(4)}
+
+ // Test for empty queue.
+ reqid, ok := rqueue.queue.next()
+ assert.False(ok)
+ assert.Equal(ReqID(0), reqid)
+
+ // Test if iterator was initialized and queue is empty.
+ rqueue.queue.initIterator()
+ reqid, ok = rqueue.queue.next()
+ assert.False(ok)
+ assert.Equal(ReqID(0), reqid)
+ rqueue.queue.releaseIterator()
+
+ req := requestsTests[0].req
+ // Push requests to the queue.
+ for _, r := range reqs {
+ _, err := rqueue.NewRequest(req.Caps, r.pr, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+ }
+
+ // Check if queue returns request IDs in proper order.
+ rqueue.queue.initIterator()
+ for _, r := range sorted {
+ reqid, ok = rqueue.queue.next()
+ assert.True(ok)
+ assert.Equal(r, reqid)
+ }
+
+ // Check if call to next() after iterating through whole queue returns false.
+ reqid, ok = rqueue.queue.next()
+ assert.False(ok)
+ assert.Equal(ReqID(0), reqid)
+ rqueue.queue.releaseIterator()
+
+ // Check if after another initialization next() returns first element.
+ rqueue.queue.initIterator()
+ reqid, ok = rqueue.queue.next()
+ assert.True(ok)
+ assert.Equal(sorted[0], reqid)
+ // Check call to releaseIterator() when iterator hasn't finished properly
+ // sets next().
+ rqueue.queue.releaseIterator()
+ reqid, ok = rqueue.queue.next()
+ assert.False(ok)
+ assert.Equal(ReqID(0), reqid)
+}
diff --git a/requests/requests.go b/requests/requests.go
new file mode 100644
index 0000000..b522aca
--- /dev/null
+++ b/requests/requests.go
@@ -0,0 +1,238 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+// Package requests provides structures and functions to handle requests.
+package requests
+
+import (
+ "time"
+
+ . "git.tizen.org/tools/boruta"
+)
+
+// ReqsCollection contains information (also historical) about handled requests.
+// It implements Requests interface.
+type ReqsCollection struct {
+ requests map[ReqID]*ReqInfo
+ queue *prioQueue
+}
+
+// NewRequestQueue provides initialized priority queue for requests.
+func NewRequestQueue() *ReqsCollection {
+ return &ReqsCollection{
+ requests: make(map[ReqID]*ReqInfo),
+ queue: newPrioQueue(),
+ }
+}
+
+// NewRequest is part of implementation of Requests interface. It validates
+// provided arguments and creates request or returns an error. Caller must make
+// sure that provided time values are in UTC.
+func (reqs *ReqsCollection) NewRequest(caps Capabilities,
+ priority Priority, owner UserInfo, validAfter time.Time,
+ deadline time.Time) (ReqID, error) {
+
+ req := &ReqInfo{
+ ID: ReqID(len(reqs.requests) + 1),
+ Priority: priority,
+ Owner: owner,
+ Deadline: deadline,
+ ValidAfter: validAfter,
+ State: WAIT,
+ Caps: caps,
+ }
+
+ if !req.Deadline.IsZero() && req.Deadline.Before(time.Now().UTC()) {
+ return 0, ErrDeadlineInThePast
+ }
+
+ if req.ValidAfter.After(req.Deadline) && !req.Deadline.IsZero() {
+ return 0, ErrInvalidTimeRange
+ }
+
+ if req.ValidAfter.IsZero() {
+ req.ValidAfter = time.Now().UTC()
+ }
+ if req.Deadline.IsZero() {
+ // TODO(mwereski): make defaultValue configurable when config is
+ // introduced
+ req.Deadline = time.Now().AddDate(0, 1, 0).UTC()
+ }
+
+ // TODO(mwereski): Check if user has rights to set given priority.
+ if req.Priority < HiPrio || req.Priority > LoPrio {
+ return 0, ErrPriority
+ }
+
+ // TODO(mwereski): Check if capabilities can be satisfied.
+
+ reqs.queue.pushRequest(req)
+ reqs.requests[req.ID] = req
+
+ return req.ID, nil
+}
+
+// CloseRequest is part of implementation of Requests interface. It checks that
+// request is in WAIT state and changes it to CANCEL or in INPROGRESS state and
+// changes it to DONE. NotFoundError may be returned if request with given reqID
+// doesn't exist in the queue or ErrModificationForbidden if request is in state
+// which can't be closed.
+func (reqs *ReqsCollection) CloseRequest(reqID ReqID) error {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ switch req.State {
+ case WAIT:
+ req.State = CANCEL
+ reqs.queue.removeRequest(req)
+ case INPROGRESS:
+ req.State = DONE
+ // TODO(mwereski): release worker
+ default:
+ return ErrModificationForbidden
+ }
+ return nil
+}
+
+// modificationPossible is simple helper function that checks if it is possible
+// to modify request it given state.
+func modificationPossible(state ReqState) bool {
+ return state == WAIT
+}
+
+// SetRequestPriority is part of implementation of Requests interface. It will
+// change priority for given request ID only if modification of request is
+// possible. NotFoundError, ErrModificationForbidden or ErrPriority (if given
+// priority is out of bounds) may be returned.
+func (reqs *ReqsCollection) SetRequestPriority(reqID ReqID, priority Priority) error {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ // TODO(mwereski): Check if user has rights to set given priority.
+ if priority < HiPrio || priority > LoPrio {
+ return ErrPriority
+ }
+ if !modificationPossible(req.State) {
+ return ErrModificationForbidden
+ }
+ if priority == req.Priority {
+ return nil
+ }
+ reqs.queue.setRequestPriority(req, priority)
+ req.Priority = priority
+ return nil
+}
+
+// SetRequestValidAfter is part of implementation of Requests interface.
+// It changes date after which request will be sent to worker. Provided time is
+// converted to UTC. Request must exist, must be in WAIT state and given date
+// must be before deadline of request. Otherwise NotFoundError,
+// ErrModificationForbidden or ErrInvalidTimeRange will be returned.
+func (reqs *ReqsCollection) SetRequestValidAfter(reqID ReqID, validAfter time.Time) error {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ if !modificationPossible(req.State) {
+ return ErrModificationForbidden
+ }
+ if validAfter.After(req.Deadline) && !req.Deadline.IsZero() {
+ return ErrInvalidTimeRange
+ }
+ req.ValidAfter = validAfter
+ // TODO(mwereski): check if request is ready to go.
+ return nil
+}
+
+// SetRequestDeadline is part of implementation of Requests interface. It changes
+// date before which request must be sent to worker. Provided time is converted
+// to UTC. Request must exist, must be in WAIT state. Given date must be in the
+// future and must be after ValidAfer. In case of not meeting these constrains
+// following errors are returned: NotFoundError, ErrModificationForbidden,
+// ErrDeadlineInThePast and ErrInvalidTimeRange.
+func (reqs *ReqsCollection) SetRequestDeadline(reqID ReqID, deadline time.Time) error {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ if !modificationPossible(req.State) {
+ return ErrModificationForbidden
+ }
+ if !deadline.IsZero() && deadline.Before(time.Now().UTC()) {
+ return ErrDeadlineInThePast
+ }
+ if !deadline.IsZero() && deadline.Before(req.ValidAfter) {
+ return ErrInvalidTimeRange
+ }
+ req.Deadline = deadline
+ // TODO(mwereski): check if request is ready to go.
+ return nil
+}
+
+// GetRequestInfo is part of implementation of Requests interface. It returns
+// ReqInfo for given request ID or NotFoundError if request with given ID doesn't
+// exits in the collection.
+func (reqs *ReqsCollection) GetRequestInfo(reqID ReqID) (ReqInfo, error) {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return ReqInfo{}, NotFoundError("Request")
+ }
+ return *req, nil
+}
+
+// ListRequests is part of implementation of Requests interface. It returns slice
+// of ReqInfo that matches ListFilter.
+func (reqs *ReqsCollection) ListRequests(filter ListFilter) ([]ReqInfo, error) {
+ res := make([]ReqInfo, 0, len(reqs.requests))
+ for _, req := range reqs.requests {
+ if filter == nil || filter.Match(req) {
+ res = append(res, *req)
+ }
+ }
+ return res, nil
+}
+
+// AcquireWorker is part of implementation of Requests interface. When worker is
+// assigned to the requests then owner of such requests may call AcquireWorker
+// to get all information required to use assigned worker.
+func (reqs *ReqsCollection) AcquireWorker(reqID ReqID) (AccessInfo, error) {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return AccessInfo{}, NotFoundError("Request")
+ }
+ if req.State != INPROGRESS || req.Job == nil {
+ return AccessInfo{}, ErrWorkerNotAssigned
+ }
+ // TODO(mwereski): create job and get access info
+ return AccessInfo{}, nil
+}
+
+// ProlongAccess is part of implementation of Requests interface. When owner of
+// the request has acquired worker that to extend time for which the worker is
+// assigned to the request.
+func (reqs *ReqsCollection) ProlongAccess(reqID ReqID) error {
+ req, ok := reqs.requests[reqID]
+ if !ok {
+ return NotFoundError("Request")
+ }
+ if req.State != INPROGRESS || req.Job == nil {
+ return ErrWorkerNotAssigned
+ }
+ // TODO(mwereski): prolong access
+ return nil
+}
diff --git a/requests/requests_test.go b/requests/requests_test.go
new file mode 100644
index 0000000..368a869
--- /dev/null
+++ b/requests/requests_test.go
@@ -0,0 +1,484 @@
+/*
+ * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License
+ */
+
+package requests
+
+import (
+ "strconv"
+ "testing"
+ "time"
+
+ . "git.tizen.org/tools/boruta"
+ "github.com/stretchr/testify/assert"
+)
+
+var (
+ owner UserInfo
+ job JobInfo
+ zeroTime time.Time
+ caps = make(Capabilities)
+ now = time.Now().UTC()
+ yesterday = now.AddDate(0, 0, -1).UTC()
+ tomorrow = now.AddDate(0, 0, 1).UTC()
+)
+
+var requestsTests = [...]struct {
+ req ReqInfo
+ err error
+}{
+ {
+ // valid request
+ req: ReqInfo{ReqID(1), Priority((HiPrio + LoPrio) / 2), owner, tomorrow, now, WAIT, &job, caps},
+ err: nil,
+ },
+ {
+ // request with invalid priority
+ req: ReqInfo{ReqID(0), Priority(LoPrio + 1), owner, tomorrow, now, WAIT, &job, caps},
+ err: ErrPriority,
+ },
+ {
+ // request with ValidAfter date newer then Deadline
+ req: ReqInfo{ReqID(0), Priority((HiPrio + LoPrio) / 2), owner, now.Add(time.Hour), tomorrow, WAIT, &job, caps},
+ err: ErrInvalidTimeRange,
+ },
+ {
+ // request with Deadline set in the past.
+ req: ReqInfo{ReqID(0), Priority((HiPrio + LoPrio) / 2), owner, yesterday, now, WAIT, &job, caps},
+ err: ErrDeadlineInThePast,
+ },
+}
+
+func initTest(t *testing.T) (*assert.Assertions, *ReqsCollection) {
+ return assert.New(t), NewRequestQueue()
+}
+
+func TestNewRequestQueue(t *testing.T) {
+ assert, q := initTest(t)
+
+ assert.Zero(len(q.requests))
+ assert.NotNil(q.queue)
+ assert.Zero(q.queue.length)
+}
+
+func TestNewRequest(t *testing.T) {
+ assert, rqueue := initTest(t)
+
+ for _, test := range requestsTests {
+ reqid, err := rqueue.NewRequest(test.req.Caps, test.req.Priority,
+ test.req.Owner, test.req.ValidAfter, test.req.Deadline)
+ assert.Equal(test.req.ID, reqid)
+ assert.Equal(test.err, err)
+ }
+
+ req := requestsTests[0].req
+ req.Deadline = zeroTime
+ req.ValidAfter = zeroTime
+ start := time.Now()
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner,
+ req.ValidAfter, req.Deadline)
+ stop := time.Now()
+ assert.Nil(err)
+ res := rqueue.requests[reqid]
+ assert.True(start.Before(res.ValidAfter) && stop.After(res.ValidAfter))
+ start = start.AddDate(0, 1, 0)
+ stop = stop.AddDate(0, 1, 0)
+ assert.True(start.Before(res.Deadline) && stop.After(res.Deadline))
+ assert.EqualValues(2, rqueue.queue.length)
+}
+
+func TestCloseRequest(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+
+ // Add valid request to the queue.
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ // Cancel previously added request.
+ assert.EqualValues(1, rqueue.queue.length)
+ err = rqueue.CloseRequest(reqid)
+ assert.Nil(err)
+ assert.Equal(ReqState(CANCEL), rqueue.requests[reqid].State)
+ assert.Zero(rqueue.queue.length)
+
+ // Try to close non-existent request.
+ err = rqueue.CloseRequest(ReqID(2))
+ assert.Equal(NotFoundError("Request"), err)
+
+ // Add another valid request.
+ reqid, err = rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+ assert.EqualValues(2, reqid)
+ // Simulate situation where request was assigned a worker and job has begun.
+ reqinfo, err := rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ rqueue.requests[reqid].State = INPROGRESS
+ rqueue.queue.removeRequest(&reqinfo)
+ // Close request.
+ err = rqueue.CloseRequest(reqid)
+ assert.Nil(err)
+ assert.EqualValues(2, len(rqueue.requests))
+ assert.Equal(ReqState(DONE), rqueue.requests[reqid].State)
+
+ // Simulation for the rest of states.
+ states := [...]ReqState{INVALID, CANCEL, TIMEOUT, DONE, FAILED}
+ reqid, err = rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+ assert.EqualValues(3, reqid)
+ reqinfo, err = rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ rqueue.queue.removeRequest(&reqinfo)
+ for i := range states {
+ rqueue.requests[reqid].State = states[i]
+ err = rqueue.CloseRequest(reqid)
+ assert.EqualValues(ErrModificationForbidden, err)
+ }
+
+ assert.EqualValues(3, len(rqueue.requests))
+ assert.EqualValues(0, rqueue.queue.length)
+}
+
+func TestSetRequestPriority(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+
+ // Add valid request.
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ legalPrio := req.Priority + 1
+ illegalPrio := 2 * LoPrio
+
+ // Change priority of the request to the same value.
+ err = rqueue.SetRequestPriority(reqid, req.Priority)
+ assert.Nil(err)
+ reqinfo, err := rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(req.Priority, reqinfo.Priority)
+
+ // Change priority of the request.
+ err = rqueue.SetRequestPriority(reqid, legalPrio)
+ assert.Nil(err)
+ reqinfo, err = rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(legalPrio, reqinfo.Priority)
+
+ // Try to change priority of request that doesn't exist.
+ err = rqueue.SetRequestPriority(req.ID+1, legalPrio)
+ assert.Equal(NotFoundError("Request"), err)
+ reqinfo, err = rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(legalPrio, reqinfo.Priority)
+
+ // Try to change priority of request to invalid value.
+ err = rqueue.SetRequestPriority(reqid, illegalPrio)
+ assert.Equal(ErrPriority, err)
+ reqinfo, err = rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(legalPrio, reqinfo.Priority)
+
+ // Try to change priority of request which is in state that forbid changes.
+ rqueue.requests[reqid].State = INVALID
+ err = rqueue.SetRequestPriority(reqid, legalPrio)
+ assert.EqualValues(ErrModificationForbidden, err)
+ reqinfo, err = rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(legalPrio, reqinfo.Priority)
+}
+
+func TestSetRequestValidAfter(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ // Set legal ValidAfter value.
+ err = rqueue.SetRequestValidAfter(reqid, tomorrow)
+ assert.Nil(err)
+ assert.Equal(tomorrow, rqueue.requests[reqid].ValidAfter)
+
+ // Try to set ValidAfter for non-existent request.
+ err = rqueue.SetRequestValidAfter(ReqID(2), tomorrow)
+ assert.Equal(NotFoundError("Request"), err)
+
+ // Try to set ValidAfter newer then Deadline.
+ rqueue.requests[reqid].Deadline = now
+ err = rqueue.SetRequestValidAfter(reqid, tomorrow)
+ assert.Equal(ErrInvalidTimeRange, err)
+
+ // Try to set ValidAfter for request which cannot be modified.
+ rqueue.requests[reqid].State = INVALID
+ err = rqueue.SetRequestValidAfter(reqid, yesterday)
+ assert.EqualValues(ErrModificationForbidden, err)
+}
+
+func TestSetRequestDeadline(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ // Set legal Deadline value.
+ dayAfter := tomorrow.AddDate(0, 0, 1).UTC()
+ err = rqueue.SetRequestDeadline(reqid, dayAfter)
+ assert.Nil(err)
+ assert.Equal(dayAfter, rqueue.requests[reqid].Deadline)
+
+ // Try to set Deadline for non-existent request.
+ err = rqueue.SetRequestDeadline(ReqID(2), tomorrow)
+ assert.Equal(NotFoundError("Request"), err)
+
+ // Try to set Deadline in the past.
+ err = rqueue.SetRequestDeadline(reqid, yesterday)
+ assert.Equal(ErrDeadlineInThePast, err)
+
+ // Try to set Deadline before ValidAfter.
+ rqueue.requests[reqid].ValidAfter = dayAfter
+ err = rqueue.SetRequestDeadline(reqid, tomorrow)
+ assert.Equal(ErrInvalidTimeRange, err)
+
+ // Try to set Deadline for request which cannot be modified.
+ rqueue.requests[reqid].State = INVALID
+ err = rqueue.SetRequestDeadline(reqid, tomorrow)
+ assert.EqualValues(ErrModificationForbidden, err)
+}
+
+func TestGetRequestInfo(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+ req.Job = nil
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ // Get request information for existing request.
+ reqUpdate, err := rqueue.GetRequestInfo(reqid)
+ assert.Nil(err)
+ assert.Equal(req, reqUpdate)
+
+ // Try to get information for non-existent request.
+ req3, err := rqueue.GetRequestInfo(ReqID(2))
+ assert.Equal(NotFoundError("Request"), err)
+ assert.Zero(req3)
+}
+
+type reqFilter struct {
+ state string
+ priority string
+}
+
+func (filter *reqFilter) Match(req *ReqInfo) bool {
+ if req == nil {
+ return false
+ }
+
+ if filter.state != "" && string(req.State) != filter.state {
+ return false
+ }
+
+ priority := strconv.FormatUint(uint64(req.Priority), 10)
+ if filter.priority != "" && priority != filter.priority {
+ return false
+ }
+
+ return true
+}
+
+func TestListRequests(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+ const reqsCnt = 4
+
+ // Add few requests.
+ reqs := make(map[ReqID]bool, reqsCnt)
+ noReqs := make(map[ReqID]bool)
+ for i := 0; i < reqsCnt; i++ {
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+ if i%2 == 1 {
+ rqueue.requests[reqid].Priority++
+ }
+ if i > 1 {
+ rqueue.requests[reqid].State = DONE
+ }
+ reqs[reqid] = true
+ }
+
+ notFoundPrio := req.Priority - 1
+ notFoundState := INVALID
+ var filterTests = [...]struct {
+ filter reqFilter
+ result map[ReqID]bool
+ }{
+ {
+ filter: reqFilter{
+ state: string(WAIT),
+ priority: strconv.FormatUint(uint64(req.Priority), 10),
+ },
+ result: map[ReqID]bool{ReqID(1): true},
+ },
+ {
+ filter: reqFilter{
+ state: string(WAIT),
+ priority: strconv.FormatUint(uint64(req.Priority+1), 10),
+ },
+ result: map[ReqID]bool{ReqID(2): true},
+ },
+ {
+ filter: reqFilter{
+ state: string(DONE),
+ priority: strconv.FormatUint(uint64(req.Priority), 10),
+ },
+ result: map[ReqID]bool{ReqID(3): true},
+ },
+ {
+ filter: reqFilter{
+ state: string(DONE),
+ priority: strconv.FormatUint(uint64(req.Priority+1), 10),
+ },
+ result: map[ReqID]bool{ReqID(4): true},
+ },
+ {
+ filter: reqFilter{
+ state: "",
+ priority: strconv.FormatUint(uint64(req.Priority), 10),
+ },
+ result: map[ReqID]bool{ReqID(1): true, ReqID(3): true},
+ },
+ {
+ filter: reqFilter{
+ state: "",
+ priority: strconv.FormatUint(uint64(req.Priority+1), 10),
+ },
+ result: map[ReqID]bool{ReqID(2): true, ReqID(4): true},
+ },
+ {
+ filter: reqFilter{
+ state: string(WAIT),
+ priority: "",
+ },
+ result: map[ReqID]bool{ReqID(1): true, ReqID(2): true},
+ },
+ {
+ filter: reqFilter{
+ state: string(DONE),
+ priority: "",
+ },
+ result: map[ReqID]bool{ReqID(3): true, ReqID(4): true},
+ },
+ {
+ filter: reqFilter{
+ state: "",
+ priority: "",
+ },
+ result: reqs,
+ },
+ {
+ filter: reqFilter{
+ state: string(notFoundState),
+ priority: strconv.FormatUint(uint64(notFoundPrio), 10),
+ },
+ result: noReqs,
+ },
+ {
+ filter: reqFilter{
+ state: string(WAIT),
+ priority: strconv.FormatUint(uint64(notFoundPrio), 10),
+ },
+ result: noReqs,
+ },
+ {
+ filter: reqFilter{
+ state: string(notFoundState),
+ priority: strconv.FormatUint(uint64(req.Priority), 10),
+ },
+ result: noReqs,
+ },
+ }
+
+ checkReqs := func(reqs map[ReqID]bool, resp []ReqInfo) {
+ assert.Equal(len(reqs), len(resp))
+ for _, req := range resp {
+ assert.True(reqs[req.ID])
+ }
+ }
+
+ for _, test := range filterTests {
+ resp, err := rqueue.ListRequests(&test.filter)
+ assert.Nil(err)
+ checkReqs(test.result, resp)
+ }
+
+ // Nil filter should return all requests.
+ resp, err := rqueue.ListRequests(nil)
+ assert.Nil(err)
+ checkReqs(reqs, resp)
+}
+
+func TestAcquireWorker(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+ empty := AccessInfo{}
+
+ // Add valid request.
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ states := [...]ReqState{WAIT, INVALID, CANCEL, TIMEOUT, DONE, FAILED, INPROGRESS}
+ for _, state := range states {
+ rqueue.requests[reqid].State = state
+ ainfo, err := rqueue.AcquireWorker(reqid)
+ assert.Equal(ErrWorkerNotAssigned, err)
+ assert.Equal(empty, ainfo)
+ }
+
+ // Try to acquire worker for non-existing request.
+ ainfo, err := rqueue.AcquireWorker(ReqID(2))
+ assert.Equal(NotFoundError("Request"), err)
+ assert.Equal(empty, ainfo)
+
+ // AcquireWorker to succeed needs JobInfo to be set. It also needs to be
+ // in INPROGRESS state, which was set in the loop.
+ rqueue.requests[reqid].Job = new(JobInfo)
+ ainfo, err = rqueue.AcquireWorker(reqid)
+ assert.Nil(err)
+ assert.Equal(empty, ainfo)
+}
+
+func TestProlongAccess(t *testing.T) {
+ assert, rqueue := initTest(t)
+ req := requestsTests[0].req
+
+ // Add valid request.
+ reqid, err := rqueue.NewRequest(req.Caps, req.Priority, req.Owner, req.ValidAfter, req.Deadline)
+ assert.Nil(err)
+
+ states := [...]ReqState{WAIT, INVALID, CANCEL, TIMEOUT, DONE, FAILED, INPROGRESS}
+ for _, state := range states {
+ rqueue.requests[reqid].State = state
+ err = rqueue.ProlongAccess(reqid)
+ assert.Equal(ErrWorkerNotAssigned, err)
+ }
+
+ // Try to prolong access of job for non-existing request.
+ err = rqueue.ProlongAccess(ReqID(2))
+ assert.Equal(NotFoundError("Request"), err)
+
+ // ProlongAccess to succeed needs JobInfo to be set. It also needs to be
+ // in INPROGRESS state, which was set in the loop.
+ rqueue.requests[reqid].Job = new(JobInfo)
+ err = rqueue.ProlongAccess(reqid)
+ assert.Nil(err)
+}