diff options
author | Pawel Wieczorek <p.wieczorek2@samsung.com> | 2018-07-03 15:47:50 +0200 |
---|---|---|
committer | Pawel Wieczorek <p.wieczorek2@samsung.com> | 2018-07-03 15:47:50 +0200 |
commit | 8032cffb294d3f9713f54e2ce9e761abef016d2d (patch) | |
tree | f1095aba61b5ced113680a6e7204c0373e91f215 | |
parent | 04ece56c947f40e64cd06c09ad6e34a10d9e7264 (diff) | |
parent | d0992e7f641444e94ec21de8ea9b622f515b7ddf (diff) | |
download | weles-srun.tar.gz weles-srun.tar.bz2 weles-srun.zip |
Merge branch 'artifacts'srun
Change-Id: I7d422c12880842ce45ef7cb83c20eaf3b6b13192
-rw-r--r-- | artifactmanager.go | 3 | ||||
-rw-r--r-- | artifacts/artifacts.go | 147 | ||||
-rw-r--r-- | artifacts/artifacts_suite_test.go | 30 | ||||
-rw-r--r-- | artifacts/artifacts_test.go | 285 | ||||
-rw-r--r-- | artifacts/database/database.go | 211 | ||||
-rw-r--r-- | artifacts/database/database_suite_test.go | 29 | ||||
-rw-r--r-- | artifacts/database/database_test.go | 285 | ||||
-rw-r--r-- | artifacts/database/errors.go | 29 | ||||
-rw-r--r-- | artifacts/downloader/downloader.go | 160 | ||||
-rw-r--r-- | artifacts/downloader/downloader_suite_test.go | 29 | ||||
-rw-r--r-- | artifacts/downloader/downloader_test.go | 243 | ||||
-rw-r--r-- | artifacts/downloader/errors.go | 26 |
12 files changed, 1476 insertions, 1 deletions
diff --git a/artifactmanager.go b/artifactmanager.go index 41e2ff5..cc9058c 100644 --- a/artifactmanager.go +++ b/artifactmanager.go @@ -103,4 +103,7 @@ type ArtifactManager interface { // GetFileInfo retrieves information about an artifact from ArtifactDB. GetArtifactInfo(path ArtifactPath) (ArtifactInfo, error) + + // Close gracefully closes ArtifactManager. + Close() error } diff --git a/artifacts/artifacts.go b/artifacts/artifacts.go index fcae9ec..ad1fc07 100644 --- a/artifacts/artifacts.go +++ b/artifacts/artifacts.go @@ -18,14 +18,159 @@ package artifacts import ( + "io/ioutil" + "os" + "path/filepath" + "strconv" + "time" + . "git.tizen.org/tools/weles" + . "git.tizen.org/tools/weles/artifacts/database" + . "git.tizen.org/tools/weles/artifacts/downloader" ) // ArtifactDownloader downloads requested file if there is need to. type ArtifactDownloader interface { // Download starts downloading requested artifact. - Download(URI ArtifactURI) (ArtifactInfo, error) + Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error // CheckInCache checks if file already exists in ArtifactDB. CheckInCache(URI ArtifactURI) (ArtifactInfo, error) + + // Close waits for all jobs to finish, and gracefully closes ArtifactDownloader. + Close() +} + +// Storage should be used by Weles' subsystems that need access to ArtifactDB +// or information about artifacts stored there. +// Storage implements ArtifactManager interface. +type Storage struct { + ArtifactManager + db ArtifactDB + dir string + downloader ArtifactDownloader + notifier chan ArtifactStatusChange +} + +const ( + // defaultDb is default ArtifactDB name. + defaultDb = "weles.db" + // defaultDir is default directory for ArtifactManager storage. + defaultDir = "/tmp/weles/" + // notifierCap is default notifier channel capacity. + notifierCap = 100 + // workersCount is default number of workers. + workersCount = 16 +) + +func newArtifactManager(db, dir string) (ArtifactManager, error) { + err := os.MkdirAll(dir, os.ModePerm) + if err != nil { + return nil, err + } + notifier := make(chan ArtifactStatusChange, notifierCap) + + am := Storage{ + dir: dir, + downloader: NewDownloader(notifier, workersCount), + notifier: notifier, + } + err = am.db.Open(db) + if err != nil { + return nil, err + } + + go am.listenToChanges() + + return &am, nil +} + +// NewArtifactManager returns initialized Storage implementing ArtifactManager interface. +// If db or dir is empy, default value will be used. +func NewArtifactManager(db, dir string) (ArtifactManager, error) { + if db == "" { + db = defaultDb + } + if dir == "" { + dir = defaultDir + } + return newArtifactManager(filepath.Join(dir, db), dir) +} + +// ListArtifact is part of implementation of ArtifactManager interface. +func (s *Storage) ListArtifact(filter ArtifactFilter) ([]ArtifactInfo, error) { + return s.db.Filter(filter) +} + +// PushArtifact is part of implementation of ArtifactManager interface. +func (s *Storage) PushArtifact(artifact ArtifactDescription, ch chan ArtifactStatusChange) (ArtifactPath, error) { + path, err := s.CreateArtifact(artifact) + if err != nil { + return "", err + } + + err = s.downloader.Download(artifact.URI, path, ch) + if err != nil { + s.db.SetStatus(ArtifactStatusChange{path, AM_FAILED}) + return "", err + } + return path, nil +} + +// CreateArtifact is part of implementation of ArtifactManager interface. +func (s *Storage) CreateArtifact(artifact ArtifactDescription) (ArtifactPath, error) { + path, err := s.getNewPath(artifact) + if err != nil { + return "", err + } + + err = s.db.InsertArtifactInfo(&ArtifactInfo{artifact, path, "", time.Now().UTC()}) + if err != nil { + return "", err + } + return path, nil +} + +// GetArtifactInfo is part of implementation of ArtifactManager interface. +func (s *Storage) GetArtifactInfo(path ArtifactPath) (ArtifactInfo, error) { + return s.db.SelectPath(path) +} + +// Close closes Storage's ArtifactDB. +func (s *Storage) Close() error { + s.downloader.Close() + close(s.notifier) + return s.db.Close() +} + +// getNewPath prepares new path for artifact. +func (s *Storage) getNewPath(ad ArtifactDescription) (ArtifactPath, error) { + var ( + jobDir = filepath.Join(s.dir, strconv.FormatUint(uint64(ad.JobID), 10)) + typeDir = filepath.Join(jobDir, string(ad.Type)) + err error + ) + + // Organize by filetypes + err = os.MkdirAll(typeDir, os.ModePerm) + if err != nil { + return "", err + } + + // Add human readable prefix + f, err := ioutil.TempFile(typeDir, string(ad.Alias)) + if err != nil { + return "", err + } + defer f.Close() + return ArtifactPath(f.Name()), err +} + +// listenToChanges updates artifact's status in db every time Storage is notified +// about status change. +func (s *Storage) listenToChanges() { + for change := range s.notifier { + // TODO handle errors returned by SetStatus + s.db.SetStatus(change) + } } diff --git a/artifacts/artifacts_suite_test.go b/artifacts/artifacts_suite_test.go new file mode 100644 index 0000000..f11caa6 --- /dev/null +++ b/artifacts/artifacts_suite_test.go @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package artifacts is responsible for Weles system's job artifact management. +package artifacts + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestArtifacts(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Artifacts Suite") +} diff --git a/artifacts/artifacts_test.go b/artifacts/artifacts_test.go new file mode 100644 index 0000000..0336835 --- /dev/null +++ b/artifacts/artifacts_test.go @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package artifacts is responsible for Weles system's job artifact management. +package artifacts + +import ( + "database/sql" + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "strconv" + + "git.tizen.org/tools/weles" + + _ "github.com/mattn/go-sqlite3" + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" +) + +var _ = Describe("ArtifactManager", func() { + + const poem = `How doth the little crocodile +Improve his shining tail, +And pour the waters of the Nile +On every golden scale! + +How cheerfully he seems to grin +How neatly spreads his claws, +And welcomes little fishes in, +With gently smiling jaws! + +-Lewis Carroll` + + var ( + testDir string + dbPath string + err error + ) + + var ( + silverKangaroo weles.ArtifactManager + job weles.JobID = 58008 + validURL weles.ArtifactURI = "validURL" + invalidURL weles.ArtifactURI = "invalidURL" + ) + + var ( + description = weles.ArtifactDescription{ + job, + weles.AM_IMAGEFILE, + "alias", + "uri", + } + + dSameJobNType = weles.ArtifactDescription{ + job, + weles.AM_IMAGEFILE, + "other alias", + "other uri", + } + + dSameJobOtherType = weles.ArtifactDescription{ + job, + weles.AM_YAMLFILE, + "another alias", + "another uri", + } + ) + + BeforeEach(func() { + testDir, err = ioutil.TempDir("", "test-weles-") + Expect(err).ToNot(HaveOccurred()) + dbPath = filepath.Join(testDir, "test.db") + + silverKangaroo, err = newArtifactManager(dbPath, testDir) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + err := os.RemoveAll(testDir) + Expect(err).ToNot(HaveOccurred()) + err = silverKangaroo.Close() + Expect(err).ToNot(HaveOccurred()) + }) + + checkPathInDb := func(path weles.ArtifactPath) bool { + db, err := sql.Open("sqlite3", dbPath) + Expect(err).ToNot(HaveOccurred()) + defer db.Close() + var n int + err = db.QueryRow("select count (*) from artifacts where path = ?", path).Scan(&n) + Expect(err).ToNot(HaveOccurred()) + return (n > 0) + } + + prepareServer := func(url weles.ArtifactURI) *httptest.Server { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if url == validURL { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, poem) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + return ts + } + + It("should create new temp directory for artifacts", func() { + var path, pathSame, pathType weles.ArtifactPath + + jobDir := filepath.Join(testDir, strconv.Itoa(int(description.JobID))) + typeDir := filepath.Join(jobDir, string(description.Type)) + newTypeDir := filepath.Join(jobDir, string(dSameJobOtherType.Type)) + + Expect(jobDir).ToNot(BeADirectory()) + + By("CreateArtifact", func() { + path, err = silverKangaroo.CreateArtifact(description) + Expect(err).ToNot(HaveOccurred()) + Expect(path).NotTo(BeNil()) + }) + + By("Check if all subdirs, and new file exists", func() { + Expect(jobDir).To(BeADirectory()) + Expect(typeDir).To(BeADirectory()) + Expect(string(path)).To(BeAnExistingFile()) + Expect(string(path)).To(ContainSubstring(string(description.Alias))) + }) + + By("Add new artifact for the same JobID", func() { + pathSame, err = silverKangaroo.CreateArtifact(dSameJobNType) + Expect(err).ToNot(HaveOccurred()) + + Expect(jobDir).To(BeADirectory()) + Expect(typeDir).To(BeADirectory()) + + Expect(string(pathSame)).To(BeAnExistingFile()) + Expect(string(pathSame)).To(ContainSubstring(string(dSameJobNType.Alias))) + }) + + By("Add artifact with other type for the same JobID", func() { + pathType, err = silverKangaroo.CreateArtifact(dSameJobOtherType) + + Expect(err).ToNot(HaveOccurred()) + Expect(jobDir).To(BeADirectory()) + Expect(newTypeDir).To(BeADirectory()) + + Expect(string(pathType)).To(BeAnExistingFile()) + Expect(string(pathType)).To(ContainSubstring(string(dSameJobOtherType.Alias))) + }) + + paths := []weles.ArtifactPath{path, pathSame, pathType} + By("Check if artifact with path is in ArtifactDB", func() { + db, err := sql.Open("sqlite3", dbPath) + Expect(err).ToNot(HaveOccurred()) + var n int + for _, p := range paths { + err = db.QueryRow("select count (*) from artifacts where path = ?", p).Scan(&n) + Expect(err).ToNot(HaveOccurred()) + Expect(n).NotTo(BeZero()) + } + }) + + By("Check if it's possible to GetFileInfo", func() { + for _, p := range paths { + Expect(checkPathInDb(p)).To(BeTrue()) + } + }) + }) + + Describe("Public initializer", func() { + var ( + defaultDb = "weles.db" + defaultDir = "/tmp/weles/" + customDb = "nawia.db" + customDir = "/tmp/weles-custom/" + ) + + DescribeTable("NewArtifactManager()", func(db, dir string) { + copperPanda, err := NewArtifactManager(db, dir) + Expect(err).ToNot(HaveOccurred()) + + if db == "" { + db = defaultDb + } + if dir == "" { + dir = defaultDir + } + + Expect(dir).To(BeADirectory()) + Expect(filepath.Join(dir, db)).To(BeAnExistingFile()) + + err = copperPanda.Close() + Expect(err).ToNot(HaveOccurred()) + + err = os.RemoveAll(dir) + Expect(err).ToNot(HaveOccurred()) + }, + Entry("create database in default directory", defaultDb, defaultDir), + Entry("create database in default directory, when arguments are empty", "", ""), + Entry("create database in custom directory", customDb, customDir), + ) + }) + + Describe("PushArtifact", func() { + + var ( + ch chan weles.ArtifactStatusChange + + ad weles.ArtifactDescription = weles.ArtifactDescription{ + job, + weles.AM_IMAGEFILE, + "somealias", + validURL, + } + + adInvalid weles.ArtifactDescription = weles.ArtifactDescription{ + job, + weles.AM_IMAGEFILE, + "somealias", + invalidURL, + } + ) + + BeforeEach(func() { + ch = make(chan weles.ArtifactStatusChange, 20) + }) + + DescribeTable("Push artifact", + func(ad weles.ArtifactDescription, finalStatus weles.ArtifactStatus) { + + ts := prepareServer(ad.URI) + defer ts.Close() + ad.URI = weles.ArtifactURI(ts.URL) + + path, err := silverKangaroo.PushArtifact(ad, ch) + + Expect(err).ToNot(HaveOccurred()) + + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING}))) + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING}))) + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, finalStatus}))) + + if finalStatus != weles.AM_FAILED { + By("Check if file exists and has proper content") + + content, err := ioutil.ReadFile(string(path)) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(BeIdenticalTo(poem)) + + } else { + By("Check if file exists") + Expect(string(path)).NotTo(BeAnExistingFile()) + } + + ai, err := silverKangaroo.GetArtifactInfo(path) + Expect(err).ToNot(HaveOccurred()) + Expect(ai.Status).To(Equal(finalStatus)) + + By("Check if artifact is in ArtifactDB") + Expect(checkPathInDb(path)).To(BeTrue()) + }, + Entry("push artifact to db and download file", ad, weles.AM_READY), + Entry("do not push an invalid artifact", adInvalid, weles.AM_FAILED), + ) + }) +}) diff --git a/artifacts/database/database.go b/artifacts/database/database.go new file mode 100644 index 0000000..7049c9c --- /dev/null +++ b/artifacts/database/database.go @@ -0,0 +1,211 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package database is responsible for Weles system's job artifact storage. +package database + +import ( + "database/sql" + "strings" + + . "git.tizen.org/tools/weles" + + "github.com/go-gorp/gorp" + // sqlite3 is imported for side-effects and will be used + // with the standard library sql interface. + _ "github.com/mattn/go-sqlite3" +) + +type artifactInfoRecord struct { + ID int64 `db:",primarykey, autoincrement"` + ArtifactInfo +} + +// ArtifactDB is responsible for database connection and queries. +type ArtifactDB struct { + handler *sql.DB + dbmap *gorp.DbMap +} + +// Open opens database connection. +func (aDB *ArtifactDB) Open(dbPath string) error { + var err error + aDB.handler, err = sql.Open("sqlite3", dbPath) + if err != nil { + return err + } + + aDB.dbmap = &gorp.DbMap{Db: aDB.handler, Dialect: gorp.SqliteDialect{}} + return aDB.initDB() +} + +// initDB initializes tables. +func (aDB *ArtifactDB) initDB() error { + // Add tables. + aDB.dbmap.AddTableWithName(artifactInfoRecord{}, "artifacts").SetKeys(true, "ID") + + return aDB.dbmap.CreateTablesIfNotExists() +} + +// Close closes the database. +func (aDB *ArtifactDB) Close() error { + return aDB.handler.Close() +} + +// InsertArtifactInfo inserts information about artifact to database. +func (aDB *ArtifactDB) InsertArtifactInfo(ai *ArtifactInfo) error { + ar := artifactInfoRecord{ + ArtifactInfo: *ai, + } + return aDB.dbmap.Insert(&ar) +} + +// SelectPath selects artifact from database based on its path. +func (aDB *ArtifactDB) SelectPath(path ArtifactPath) (ArtifactInfo, error) { + ar := artifactInfoRecord{} + err := aDB.dbmap.SelectOne(&ar, "select * from artifacts where Path=?", path) + if err != nil { + return ArtifactInfo{}, err + } + return ar.ArtifactInfo, nil +} + +// prepareQuery prepares query based on given filter. +// TODO code duplication +func prepareQuery(filter ArtifactFilter) (string, []interface{}) { + var ( + conditions []string + query = "select * from artifacts " + args []interface{} + ) + if len(filter.JobID) > 0 { + q := make([]string, len(filter.JobID)) + for i, job := range filter.JobID { + q[i] = "?" + args = append(args, job) + } + conditions = append(conditions, " JobID in ("+strings.Join(q, ",")+")") + } + if len(filter.Type) > 0 { + q := make([]string, len(filter.Type)) + for i, typ := range filter.Type { + q[i] = "?" + args = append(args, typ) + } + conditions = append(conditions, " Type in ("+strings.Join(q, ",")+")") + } + if len(filter.Status) > 0 { + q := make([]string, len(filter.Status)) + for i, status := range filter.Status { + q[i] = "?" + args = append(args, status) + } + conditions = append(conditions, " Status in ("+strings.Join(q, ",")+")") + } + if len(filter.Alias) > 0 { + q := make([]string, len(filter.Alias)) + for i, alias := range filter.Alias { + q[i] = "?" + args = append(args, alias) + } + conditions = append(conditions, " Alias in ("+strings.Join(q, ",")+")") + } + if len(conditions) > 0 { + query += " where " + strings.Join(conditions, " AND ") + } + return query, args +} + +// Filter fetches elements matching ArtifactFilter from database. +func (aDB *ArtifactDB) Filter(filter ArtifactFilter) ([]ArtifactInfo, error) { + results := []artifactInfoRecord{} + + query, args := prepareQuery(filter) + + // TODO gorp doesn't support passing list of arguments to where in(...) clause yet. + // Thats why it's done with the use prepareQuery. + _, err := aDB.dbmap.Select(&results, query, args...) + if err != nil { + return nil, err + } + artifacts := make([]ArtifactInfo, len(results)) + for i, res := range results { + artifacts[i] = res.ArtifactInfo + } + return artifacts, nil + +} + +// Select fetches artifacts from ArtifactDB. +func (aDB *ArtifactDB) Select(arg interface{}) (artifacts []ArtifactInfo, err error) { + var ( + results []artifactInfoRecord + query string + ) + // TODO prepare efficient way of executing generic select. + switch arg.(type) { + case JobID: + query = "select * from artifacts where JobID = ?" + case ArtifactType: + query = "select * from artifacts where Type = ?" + case ArtifactAlias: + query = "select * from artifacts where Alias = ?" + case ArtifactStatus: + query = "select * from artifacts where Status = ?" + default: + return nil, ErrUnsupportedQueryType + } + + _, err = aDB.dbmap.Select(&results, query, arg) + if err != nil { + return nil, err + } + artifacts = make([]ArtifactInfo, len(results)) + for i, res := range results { + artifacts[i] = res.ArtifactInfo + } + return artifacts, nil +} + +// getID fetches ID of an artifact with provided path. +func (aDB *ArtifactDB) getID(path ArtifactPath) (int64, error) { + res, err := aDB.dbmap.SelectInt("select ID from artifacts where Path=?", path) + if err != nil { + return 0, err + } + return res, nil +} + +// SetStatus changes artifact's status in ArtifactDB. +func (aDB *ArtifactDB) SetStatus(change ArtifactStatusChange) error { + ai, err := aDB.SelectPath(change.Path) + if err != nil { + return err + } + ar := artifactInfoRecord{ + ArtifactInfo: ai, + } + + id, err := aDB.getID(ar.Path) + if err != nil { + return err + } + ar.ID = id + + ar.Status = change.NewStatus + _, err = aDB.dbmap.Update(&ar) + return err +} diff --git a/artifacts/database/database_suite_test.go b/artifacts/database/database_suite_test.go new file mode 100644 index 0000000..55dbe8c --- /dev/null +++ b/artifacts/database/database_suite_test.go @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package database_test + +import ( + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + "testing" +) + +func TestDatabase(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Database Suite") +} diff --git a/artifacts/database/database_test.go b/artifacts/database/database_test.go new file mode 100644 index 0000000..5f4dff7 --- /dev/null +++ b/artifacts/database/database_test.go @@ -0,0 +1,285 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package database is responsible for Weles system's job artifact storage. +package database + +import ( + "database/sql" + "io/ioutil" + "os" + "path/filepath" + "time" + + "git.tizen.org/tools/weles" + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" +) + +var _ = Describe("ArtifactDB", func() { + var ( + job weles.JobID = 58008 + invalidJob weles.JobID = 1 + invalidPath weles.ArtifactPath = "invalidPath" + invalidStatus weles.ArtifactStatus = "invalidStatus" + invalidType weles.ArtifactType = "invalidType" + invalidAlias weles.ArtifactAlias = "invalidAlias" + goldenUnicorn ArtifactDB + tmpDir string + + artifact = weles.ArtifactInfo{ + weles.ArtifactDescription{ + job, + weles.AM_IMAGEFILE, + "some alias", + "http://example.com", + }, + "path1", + weles.AM_PENDING, + time.Now().UTC(), + } + + aImageReady = weles.ArtifactInfo{ + weles.ArtifactDescription{ + job + 1, + weles.AM_IMAGEFILE, + "other alias", + "http://example.com/1", + }, + "path2", + weles.AM_READY, + time.Now().UTC(), + } + + aYamlFailed = weles.ArtifactInfo{ + weles.ArtifactDescription{ + job + 1, + weles.AM_YAMLFILE, + "other alias", + "http://example.com/2", + }, + "path3", + weles.AM_FAILED, + time.Now().UTC(), + } + + aTestFailed = weles.ArtifactInfo{ + weles.ArtifactDescription{ + job + 2, + weles.AM_TESTFILE, + "alias", + "http://example.com/2", + }, + "path4", + weles.AM_FAILED, + time.Unix(3000, 60).UTC(), + } + + testArtifacts = []weles.ArtifactInfo{artifact, aImageReady, aYamlFailed, aTestFailed} + + oneJobFilter = weles.ArtifactFilter{[]weles.JobID{artifact.JobID}, nil, nil, nil} + twoJobsFilter = weles.ArtifactFilter{[]weles.JobID{artifact.JobID, aImageReady.JobID}, nil, nil, nil} + noJobFilter = weles.ArtifactFilter{[]weles.JobID{invalidJob}, nil, nil, nil} + + oneTypeFilter = weles.ArtifactFilter{nil, []weles.ArtifactType{aYamlFailed.Type}, nil, nil} + twoTypesFilter = weles.ArtifactFilter{nil, []weles.ArtifactType{aYamlFailed.Type, aTestFailed.Type}, nil, nil} + noTypeFilter = weles.ArtifactFilter{nil, []weles.ArtifactType{invalidType}, nil, nil} + + oneStatusFilter = weles.ArtifactFilter{nil, nil, []weles.ArtifactStatus{artifact.Status}, nil} + twoStatusFilter = weles.ArtifactFilter{nil, nil, []weles.ArtifactStatus{artifact.Status, aYamlFailed.Status}, nil} + noStatusFilter = weles.ArtifactFilter{nil, nil, []weles.ArtifactStatus{invalidStatus}, nil} + + oneAliasFilter = weles.ArtifactFilter{nil, nil, nil, []weles.ArtifactAlias{artifact.Alias}} + twoAliasFilter = weles.ArtifactFilter{nil, nil, nil, []weles.ArtifactAlias{artifact.Alias, aImageReady.Alias}} + noAliasFilter = weles.ArtifactFilter{nil, nil, nil, []weles.ArtifactAlias{invalidAlias}} + + fullFilter = weles.ArtifactFilter{twoJobsFilter.JobID, twoTypesFilter.Type, twoStatusFilter.Status, twoAliasFilter.Alias} + noMatchFilter = weles.ArtifactFilter{oneJobFilter.JobID, oneTypeFilter.Type, nil, nil} + emptyFilter = weles.ArtifactFilter{} + ) + + jobsInDB := func(job weles.JobID) int64 { + n, err := goldenUnicorn.dbmap.SelectInt(`SELECT COUNT(*) + FROM artifacts + WHERE JobID = ?`, job) + Expect(err).ToNot(HaveOccurred()) + return n + } + + BeforeEach(func() { + var err error + tmpDir, err = ioutil.TempDir("", "weles-") + Expect(err).ToNot(HaveOccurred()) + err = goldenUnicorn.Open(filepath.Join(tmpDir, "test.db")) + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + err := goldenUnicorn.Close() + Expect(err).ToNot(HaveOccurred()) + err = os.RemoveAll(tmpDir) + Expect(err).ToNot(HaveOccurred()) + }) + + It("should open new database, with artifact table", func() { + n, err := goldenUnicorn.dbmap.SelectInt(`SELECT COUNT(*) + FROM sqlite_master + WHERE name = 'artifacts' + AND type = 'table'`) + Expect(err).ToNot(HaveOccurred()) + Expect(n).To(BeNumerically("==", 1)) + }) + + It("should fail to open database on invalid path", func() { + // sql.Open only validates arguments. + // db.Ping must be called to check the connection. + invalidDatabasePath := filepath.Join(tmpDir, "invalid", "test.db") + err := goldenUnicorn.Open(invalidDatabasePath) + Expect(err).To(HaveOccurred()) + Expect(invalidDatabasePath).ToNot(BeAnExistingFile()) + }) + + It("should insert new artifact to database", func() { + Expect(jobsInDB(job)).To(BeNumerically("==", 0)) + + err := goldenUnicorn.InsertArtifactInfo(&artifact) + Expect(err).ToNot(HaveOccurred()) + + Expect(jobsInDB(artifact.JobID)).To(BeNumerically("==", 1)) + }) + + Describe("SelectPath", func() { + + BeforeEach(func() { + err := goldenUnicorn.InsertArtifactInfo(&artifact) + Expect(err).ToNot(HaveOccurred()) + + Expect(jobsInDB(artifact.JobID)).To(BeNumerically("==", 1)) + }) + + DescribeTable("database selectpath", + func(path weles.ArtifactPath, expectedErr error, expectedArtifact weles.ArtifactInfo) { + result, err := goldenUnicorn.SelectPath(path) + + if expectedErr != nil { + Expect(err).To(Equal(expectedErr)) + } else { + Expect(err).ToNot(HaveOccurred()) + } + Expect(result).To(Equal(expectedArtifact)) + }, + Entry("retrieve artifact based on path", artifact.Path, nil, artifact), + Entry("retrieve artifact based on invalid path", invalidPath, sql.ErrNoRows, weles.ArtifactInfo{}), + ) + }) + + Describe("Select", func() { + + BeforeEach(func() { + for _, a := range testArtifacts { + err := goldenUnicorn.InsertArtifactInfo(&a) + Expect(err).ToNot(HaveOccurred()) + } + }) + + DescribeTable("database select", + func(lookedFor interface{}, expectedErr error, expectedResult ...weles.ArtifactInfo) { + result, err := goldenUnicorn.Select(lookedFor) + + if expectedErr != nil { + Expect(err).To(Equal(expectedErr)) + Expect(result).To(BeNil()) + } else { + Expect(err).ToNot(HaveOccurred()) + Expect(result).To(Equal(expectedResult)) + } + }, + // types supported by select. + Entry("select JobID", artifact.JobID, nil, artifact), + Entry("select Type", weles.AM_YAMLFILE, nil, aYamlFailed), + Entry("select Status", weles.AM_READY, nil, aImageReady), + Entry("select Alias", artifact.Alias, nil, artifact), + // type bool is not supported by select. + Entry("select unsupported value", true, ErrUnsupportedQueryType), + // test query itsef. + Entry("select multiple entries for JobID", aImageReady.JobID, nil, aImageReady, aYamlFailed), + Entry("select no entries for invalid JobID", invalidJob, nil), + Entry("select multiple entries for Type", weles.AM_IMAGEFILE, nil, artifact, aImageReady), + Entry("select multiple entries for Alias", aImageReady.Alias, nil, aImageReady, aYamlFailed), + Entry("select multiple entries for Status", weles.AM_FAILED, nil, aYamlFailed, aTestFailed), + ) + }) + + Describe("List", func() { + BeforeEach(func() { + for _, a := range testArtifacts { + err := goldenUnicorn.InsertArtifactInfo(&a) + Expect(err).ToNot(HaveOccurred()) + } + }) + DescribeTable("list artifacts matching filter", + func(filter weles.ArtifactFilter, expected ...weles.ArtifactInfo) { + results, err := goldenUnicorn.Filter(filter) + Expect(err).ToNot(HaveOccurred()) + Expect(results).To(ConsistOf(expected)) + }, + Entry("filter one JobID", oneJobFilter, artifact), + Entry("filter more than one JobIDs", twoJobsFilter, artifact, aImageReady, aYamlFailed), + Entry("filter JobID not in db", noJobFilter), + Entry("filter one Type", oneTypeFilter, aYamlFailed), + Entry("filter more than one Type", twoTypesFilter, aYamlFailed, aTestFailed), + Entry("filter Type not in db", noTypeFilter), + Entry("filter one Status", oneStatusFilter, artifact), + Entry("filter more than one Status", twoStatusFilter, artifact, aTestFailed, aYamlFailed), + Entry("filter Status not in db", noStatusFilter), + Entry("filter one Alias", oneAliasFilter, artifact), + Entry("filter more than one Alias", twoAliasFilter, artifact, aImageReady, aYamlFailed), + Entry("filter Alias not in db", noAliasFilter), + Entry("filter is completly set up", fullFilter, aYamlFailed), + Entry("no artifact in db matches filter", noMatchFilter), + Entry("filter is empty", emptyFilter, artifact, aImageReady, aYamlFailed, aTestFailed), + ) + }) + Describe("SetStatus", func() { + BeforeEach(func() { + for _, a := range testArtifacts { + err := goldenUnicorn.InsertArtifactInfo(&a) + Expect(err).ToNot(HaveOccurred()) + } + }) + DescribeTable("artifact status change", + func(change weles.ArtifactStatusChange, expectedErr error) { + + err := goldenUnicorn.SetStatus(change) + if expectedErr == nil { + Expect(err).ToNot(HaveOccurred()) + + a, err := goldenUnicorn.SelectPath(change.Path) + Expect(err).ToNot(HaveOccurred()) + Expect(a.Status).To(Equal(change.NewStatus)) + } else { + Expect(err).To(Equal(expectedErr)) + a, err := goldenUnicorn.SelectPath(change.Path) + Expect(err).To(HaveOccurred()) + Expect(a).To(Equal(weles.ArtifactInfo{})) + } + }, + Entry("change status of artifact not present in ArtifactDB", weles.ArtifactStatusChange{invalidPath, weles.AM_DOWNLOADING}, sql.ErrNoRows), + Entry("change status of artifact present in ArtifactDB", weles.ArtifactStatusChange{artifact.Path, weles.AM_DOWNLOADING}, nil), + ) + }) +}) diff --git a/artifacts/database/errors.go b/artifacts/database/errors.go new file mode 100644 index 0000000..7f2d8a4 --- /dev/null +++ b/artifacts/database/errors.go @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// File errors.go provides definitions of errors for Weles' database package. + +package database + +import ( + "errors" +) + +var ( + // ErrUnsupportedQueryType is returned when wrong type of argument is passed to + // ArtifactDB's Select(). + ErrUnsupportedQueryType = errors.New("unsupported argument type") +) diff --git a/artifacts/downloader/downloader.go b/artifacts/downloader/downloader.go new file mode 100644 index 0000000..2edf024 --- /dev/null +++ b/artifacts/downloader/downloader.go @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// Package downloader is responsible for Weles system's job artifact downloading. +package downloader + +import ( + "fmt" + "io" + "net/http" + "os" + "sync" + + . "git.tizen.org/tools/weles" +) + +// Downloader implements ArtifactDownloader interface. +type Downloader struct { + notification chan ArtifactStatusChange // can be used to monitor ArtifactStatusChanges. + queue chan downloadJob + wg sync.WaitGroup +} + +// downloadJob provides necessary info for download to be done. +type downloadJob struct { + path ArtifactPath + uri ArtifactURI + ch chan ArtifactStatusChange +} + +// queueCap is the default length of download queue. +const queueCap = 100 + +// newDownloader returns initilized Downloader. +func newDownloader(notification chan ArtifactStatusChange, workers int, queueSize int) *Downloader { + + d := &Downloader{ + notification: notification, + queue: make(chan downloadJob, queueSize), + } + + // Start all workers. + d.wg.Add(workers) + for i := 0; i < workers; i++ { + go d.work() + } + return d +} + +// NewDownloader returns Downloader initialized with default queue length +func NewDownloader(notification chan ArtifactStatusChange, workerCount int) *Downloader { + return newDownloader(notification, workerCount, queueCap) +} + +// Close is part of implementation of ArtifactDownloader interface. +// It waits for running download jobs to stop and closes used channels. +func (d *Downloader) Close() { + close(d.queue) + d.wg.Wait() +} + +// getData downloads file from provided location and saves it in a prepared path. +func (d *Downloader) getData(URI ArtifactURI, path ArtifactPath) error { + resp, err := http.Get(string(URI)) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("server error %v %v", URI, resp.Status) + } + + file, err := os.Create(string(path)) + if err != nil { + return err + } + defer file.Close() + + _, err = io.Copy(file, resp.Body) + return err +} + +// download downloads artifact from provided URI and saves it to specified path. +// It sends notification about status changes to two channels - Downloader's notification +// channel, and other one, that can be specified passed as an argument. +func (d *Downloader) download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) { + if path == "" { + return + } + + change := ArtifactStatusChange{ + Path: path, + NewStatus: AM_DOWNLOADING, + } + channels := []chan ArtifactStatusChange{ch, d.notification} + notify(change, channels) + + err := d.getData(URI, path) + if err != nil { + os.Remove(string(path)) + change.NewStatus = AM_FAILED + } else { + change.NewStatus = AM_READY + } + notify(change, channels) +} + +// Download is part of implementation of ArtifactDownloader interface. +// It puts new downloadJob on the queue. +func (d *Downloader) Download(URI ArtifactURI, path ArtifactPath, ch chan ArtifactStatusChange) error { + channels := []chan ArtifactStatusChange{ch, d.notification} + notify(ArtifactStatusChange{path, AM_PENDING}, channels) + + job := downloadJob{ + path: path, + uri: URI, + ch: ch, + } + + select { + case d.queue <- job: + default: + return ErrQueueFull + } + return nil +} + +func (d *Downloader) work() { + defer d.wg.Done() + for job := range d.queue { + d.download(job.uri, job.path, job.ch) + } +} + +// CheckInCache is part of implementation of ArtifactDownloader interface. +// TODO implement. +func (d *Downloader) CheckInCache(URI ArtifactURI) (ArtifactInfo, error) { + return ArtifactInfo{}, ErrNotImplemented +} + +// notify sends ArtifactStatusChange to all specified channels. +func notify(change ArtifactStatusChange, channels []chan ArtifactStatusChange) { + for _, ch := range channels { + ch <- change + } +} diff --git a/artifacts/downloader/downloader_suite_test.go b/artifacts/downloader/downloader_suite_test.go new file mode 100644 index 0000000..d866229 --- /dev/null +++ b/artifacts/downloader/downloader_suite_test.go @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package downloader + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestDownloader(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Downloader Suite") +} diff --git a/artifacts/downloader/downloader_test.go b/artifacts/downloader/downloader_test.go new file mode 100644 index 0000000..c6537b3 --- /dev/null +++ b/artifacts/downloader/downloader_test.go @@ -0,0 +1,243 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +package downloader + +import ( + "fmt" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + + "git.tizen.org/tools/weles" + . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" + . "github.com/onsi/gomega" +) + +var _ = Describe("Downloader", func() { + + const pigs = `The pig, if I am not mistaken, +Supplies us sausage, ham, and bacon. +Let others say his heart is big -- +I call it stupid of the pig. + +-Ogden Nash` + + var platinumKoala *Downloader + + var ( + tmpDir string + validDir string + invalidDir string + validURL weles.ArtifactURI = "validURL" + invalidURL weles.ArtifactURI = "invalidURL" + ts *httptest.Server + ch chan weles.ArtifactStatusChange + ) + + var ( + notifyCap int = 100 // notitication channel capacity. + notification chan weles.ArtifactStatusChange + workersCount = 8 + ) + + checkChannels := func(ch1, ch2 chan weles.ArtifactStatusChange, change weles.ArtifactStatusChange) { + Eventually(ch1).Should(Receive(Equal(change))) + Eventually(ch2).Should(Receive(Equal(change))) + } + + BeforeEach(func() { + + var err error + // prepare Downloader. + notification = make(chan weles.ArtifactStatusChange, notifyCap) + platinumKoala = NewDownloader(notification, workersCount) + + // prepare temporary directories. + tmpDir, err = ioutil.TempDir("", "weles-") + Expect(err).ToNot(HaveOccurred()) + validDir = filepath.Join(tmpDir, "valid") + err = os.MkdirAll(validDir, os.ModePerm) + Expect(err).ToNot(HaveOccurred()) + // directory is not created therefore path will be invalid. + invalidDir = filepath.Join(tmpDir, "invalid") + + ch = make(chan weles.ArtifactStatusChange, 5) + }) + + AfterEach(func() { + platinumKoala.Close() + err := os.RemoveAll(tmpDir) + Expect(err).ToNot(HaveOccurred()) + + }) + + prepareServer := func(url weles.ArtifactURI) *httptest.Server { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if url == validURL { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, pigs) + } else { + w.WriteHeader(http.StatusNotFound) + } + })) + return ts + } + + DescribeTable("getData(): Notify channels and save data to file", + func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) { + ts = prepareServer(url) + defer ts.Close() + + dir := validDir + if !valid { + dir = invalidDir + } + filename := weles.ArtifactPath(filepath.Join(dir, "test")) + + err := platinumKoala.getData(weles.ArtifactURI(ts.URL), weles.ArtifactPath(filename)) + + if valid && url != invalidURL { + Expect(err).ToNot(HaveOccurred()) + content, err := ioutil.ReadFile(string(filename)) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(Equal(pigs)) + } else { + Expect(string(filename)).NotTo(BeAnExistingFile()) + _, err := ioutil.ReadFile(string(filename)) + Expect(err).To(HaveOccurred()) + } + + }, + Entry("download valid file to valid path", validURL, true, weles.AM_READY), + Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED), + Entry("fail when path is invalid", validURL, false, weles.AM_FAILED), + Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED), + ) + + DescribeTable("download(): Notify channels and save data to file", + func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) { + ts = prepareServer(url) + defer ts.Close() + + dir := validDir + if !valid { + dir = invalidDir + } + filename := weles.ArtifactPath(filepath.Join(dir, "test")) + + status := weles.ArtifactStatusChange{filename, weles.AM_DOWNLOADING} + + platinumKoala.download(weles.ArtifactURI(ts.URL), weles.ArtifactPath(filename), ch) + + status.NewStatus = weles.AM_DOWNLOADING + checkChannels(ch, platinumKoala.notification, status) + + status.NewStatus = finalResult + checkChannels(ch, platinumKoala.notification, status) + + if valid && url != invalidURL { + content, err := ioutil.ReadFile(string(filename)) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(Equal(pigs)) + } else { + Expect(string(filename)).NotTo(BeAnExistingFile()) + } + + }, + Entry("download valid file to valid path", validURL, true, weles.AM_READY), + Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED), + Entry("fail when path is invalid", validURL, false, weles.AM_FAILED), + Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED), + ) + + DescribeTable("Download(): Notify ch channel about any changes", + func(url weles.ArtifactURI, valid bool, finalResult weles.ArtifactStatus) { + ts = prepareServer(url) + defer ts.Close() + + dir := validDir + if !valid { + dir = invalidDir + } + path := weles.ArtifactPath(filepath.Join(dir, "animal")) + + err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch) + Expect(err).ToNot(HaveOccurred()) + + status := weles.ArtifactStatusChange{path, weles.AM_PENDING} + Eventually(ch).Should(Receive(Equal(status))) + + status.NewStatus = weles.AM_DOWNLOADING + Eventually(ch).Should(Receive(Equal(status))) + + status.NewStatus = finalResult + Eventually(ch).Should(Receive(Equal(status))) + }, + Entry("download valid file to valid path", validURL, true, weles.AM_READY), + Entry("fail when url is invalid", invalidURL, true, weles.AM_FAILED), + Entry("fail when path is invalid", validURL, false, weles.AM_FAILED), + Entry("fail when url and path are invalid", invalidURL, false, weles.AM_FAILED), + ) + + DescribeTable("Download(): Download files to specified path.", + func(url weles.ArtifactURI, filename string, poem string) { + ts = prepareServer(url) + defer ts.Close() + + path := weles.ArtifactPath(filepath.Join(validDir, filename)) + + err := platinumKoala.Download(weles.ArtifactURI(ts.URL), path, ch) + Expect(err).ToNot(HaveOccurred()) + + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_PENDING}))) + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_DOWNLOADING}))) + + if poem != "" { + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_READY}))) + content, err := ioutil.ReadFile(string(path)) + Expect(err).ToNot(HaveOccurred()) + Expect(string(content)).To(BeIdenticalTo(poem)) + } else { + Eventually(ch).Should(Receive(Equal(weles.ArtifactStatusChange{path, weles.AM_FAILED}))) + content, err := ioutil.ReadFile(string(path)) + Expect(err).To(HaveOccurred()) + Expect(content).To(BeNil()) + + } + }, + Entry("download valid file to valid path", validURL, "pigs", pigs), + Entry("fail when url is invalid", invalidURL, "cows", nil), + ) + + Describe("DownloadJob queue capacity", func() { + It("should return error if queue if full.", func() { + ts = prepareServer(validURL) + + notification := make(chan weles.ArtifactStatusChange, notifyCap) + ironGopher := newDownloader(notification, 0, 0) + defer ironGopher.Close() + + path := weles.ArtifactPath(filepath.Join(validDir, "file")) + + err := ironGopher.Download(weles.ArtifactURI(ts.URL), path, ch) + Expect(err).To(Equal(ErrQueueFull)) + }) + }) +}) diff --git a/artifacts/downloader/errors.go b/artifacts/downloader/errors.go new file mode 100644 index 0000000..359da59 --- /dev/null +++ b/artifacts/downloader/errors.go @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2017 Samsung Electronics Co., Ltd All Rights Reserved + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License + */ + +// File errors.go provides definitions of downloader errors. + +package downloader + +import "errors" + +var ( + //ErrQueueFull is returned when download queue is full. + ErrQueueFull = errors.New("downlad queue is full") +) |