mirror of https://github.com/agola-io/agola
objectstorage: multiple updates and changes
* Use context.Context in functions * Always use "/" delimiter * Update to latest minio client * Remove unused posixflat storage
This commit is contained in:
parent
4574b0d5f4
commit
4f6eadb04a
6
go.mod
6
go.mod
|
@ -28,7 +28,7 @@ require (
|
|||
github.com/huandu/xstrings v1.4.0
|
||||
github.com/lib/pq v1.10.9
|
||||
github.com/mattn/go-sqlite3 v1.14.22
|
||||
github.com/minio/minio-go/v6 v6.0.57
|
||||
github.com/minio/minio-go/v7 v7.0.70
|
||||
github.com/mitchellh/copystructure v1.2.0
|
||||
github.com/mitchellh/go-homedir v1.1.0
|
||||
github.com/mitchellh/mapstructure v1.5.0
|
||||
|
@ -71,6 +71,7 @@ require (
|
|||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/docker/go-connections v0.5.0 // indirect
|
||||
github.com/docker/go-units v0.5.0 // indirect
|
||||
github.com/dustin/go-humanize v1.0.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.12.0 // indirect
|
||||
github.com/emirpasic/gods v1.18.1 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
|
@ -82,6 +83,7 @@ require (
|
|||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.21.0 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/goccy/go-json v0.10.2 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
|
@ -107,7 +109,6 @@ require (
|
|||
github.com/mattn/go-colorable v0.1.13 // indirect
|
||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||
github.com/minio/md5-simd v1.1.2 // indirect
|
||||
github.com/minio/sha256-simd v1.0.0 // indirect
|
||||
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
|
||||
github.com/mitchellh/reflectwalk v1.0.2 // indirect
|
||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||
|
@ -122,6 +123,7 @@ require (
|
|||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.0 // indirect
|
||||
github.com/pjbgf/sha1cd v0.3.0 // indirect
|
||||
github.com/rs/xid v1.5.0 // indirect
|
||||
github.com/sergi/go-diff v1.3.2-0.20230802210424-5b0b94c5c0d3 // indirect
|
||||
github.com/shopspring/decimal v1.4.0 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
|
|
30
go.sum
30
go.sum
|
@ -66,7 +66,8 @@ github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj
|
|||
github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc=
|
||||
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
|
||||
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
|
||||
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
|
||||
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.1 h1:m0kkaHRKEu7tUIUFVwhGGGYClXvyl4RE03qmvRTNfbw=
|
||||
github.com/elazarl/go-bindata-assetfs v1.0.1/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
|
||||
github.com/elazarl/goproxy v0.0.0-20230808193330-2592e75ae04a h1:mATvB/9r/3gvcejNsXKSkQ6lcIaNec2nyfOdlTBR2lU=
|
||||
|
@ -115,6 +116,8 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
|
|||
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
|
||||
github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68=
|
||||
github.com/go-test/deep v1.0.3/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
|
||||
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
|
||||
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
|
||||
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
|
||||
github.com/gofrs/uuid v4.4.0+incompatible h1:3qXRTX8/NbyulANqlc0lchS1gqAVxRgsuW1YrTJupqA=
|
||||
github.com/gofrs/uuid v4.4.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
|
||||
|
@ -148,7 +151,6 @@ github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6/go.mod h1:kf6iHlnVGwg
|
|||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
|
||||
github.com/gorilla/csrf v1.7.2 h1:oTUjx0vyf2T+wkrx09Trsev1TE+/EbDAeHtSTbtC2eI=
|
||||
github.com/gorilla/csrf v1.7.2/go.mod h1:F1Fj3KG23WYHE6gozCmBAezKookxbIvUJT+121wTuLk=
|
||||
github.com/gorilla/handlers v1.5.2 h1:cLTUSsNkgcwhgRqvCNmdbRWG0A3N4F+M2nWKdScwyEE=
|
||||
|
@ -190,10 +192,8 @@ github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 h1:BQSFePA1RWJOl
|
|||
github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99/go.mod h1:1lJo3i6rXxKeerYnT8Nvf0QmHCRC1n8sfWVwXF2Frvo=
|
||||
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
|
||||
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
|
||||
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
|
||||
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
|
||||
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
|
||||
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
|
||||
github.com/kevinburke/ssh_config v1.2.0 h1:x584FjTGwHzMwvHx18PXxbBVzfnxogHaAReU4gf13a4=
|
||||
|
@ -202,12 +202,9 @@ github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI
|
|||
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
|
||||
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
|
||||
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
|
||||
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
|
||||
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM=
|
||||
github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
|
||||
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||
|
@ -227,14 +224,10 @@ github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWE
|
|||
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
|
||||
github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU=
|
||||
github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
||||
github.com/minio/md5-simd v1.1.0/go.mod h1:XpBqgZULrMYD3R+M28PcmP0CkI7PEMzB3U77ZrKZ0Gw=
|
||||
github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34=
|
||||
github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM=
|
||||
github.com/minio/minio-go/v6 v6.0.57 h1:ixPkbKkyD7IhnluRgQpGSpHdpvNVaW6OD5R9IAO/9Tw=
|
||||
github.com/minio/minio-go/v6 v6.0.57/go.mod h1:5+R/nM9Pwrh0vqF+HbYYDQ84wdUFPyXHkrdT4AIkifM=
|
||||
github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM=
|
||||
github.com/minio/sha256-simd v1.0.0 h1:v1ta+49hkWZyvaKwrQB8elexRqm6Y0aMLjCNsrYxo6g=
|
||||
github.com/minio/sha256-simd v1.0.0/go.mod h1:OuYzVNI5vcoYIAmbIvHPl3N3jUzVedXbKy5RFepssQM=
|
||||
github.com/minio/minio-go/v7 v7.0.70 h1:1u9NtMgfK1U42kUxcsl5v0yj6TEOPR497OAQxpJnn2g=
|
||||
github.com/minio/minio-go/v7 v7.0.70/go.mod h1:4yBA8v80xGA30cfM3fz0DKYMXunWl/AV/6tWEs9ryzo=
|
||||
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
|
||||
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
|
||||
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
|
||||
|
@ -262,7 +255,6 @@ github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbD
|
|||
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
|
||||
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
|
||||
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
|
||||
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
|
||||
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
|
||||
|
@ -289,6 +281,7 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
|
|||
github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M=
|
||||
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
|
||||
github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/xid v1.5.0 h1:mKX4bl4iPYJtEIxp6CYiUuLQ/8DYMoz0PUdtGgMFRVc=
|
||||
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
|
||||
github.com/rs/zerolog v1.26.1/go.mod h1:/wSSJWX7lVrsOwlbyTRSOJvqRlc+WjWlfes+CiJ+tmc=
|
||||
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
|
||||
|
@ -303,14 +296,11 @@ github.com/sgotti/gexpect v0.0.0-20210315095146-1ec64e69809b/go.mod h1:iw90eoXMZ
|
|||
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
|
||||
github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k=
|
||||
github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME=
|
||||
github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo=
|
||||
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/skeema/knownhosts v1.2.2 h1:Iug2P4fLmDw9f41PB6thxUkNUkJzB5i+1/exaj40L3A=
|
||||
github.com/skeema/knownhosts v1.2.2/go.mod h1:xYbVRSPxqBZFrdmDyMmsOs+uX1UZC3nTN3ThzgDxUwo=
|
||||
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
|
||||
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/sorintlab/errors v0.0.0-20230328073828-75145c80e43d h1:OOO0G7VT35z8rN9pAnK0rBueL4HqZItHxEjWFP1DjlA=
|
||||
github.com/sorintlab/errors v0.0.0-20230328073828-75145c80e43d/go.mod h1:y6BMSfuxo7hQ7iJCppECcFn2jg5PKhkWvF5TMC6Q+3U=
|
||||
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
|
||||
|
@ -360,7 +350,6 @@ go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v8
|
|||
go.starlark.net v0.0.0-20240517230649-3792562d0b7f h1:APah0oANPHA7m/z/1Ngcccc+BEO/dmLcEfrzHAQQY6w=
|
||||
go.starlark.net v0.0.0-20240517230649-3792562d0b7f/go.mod h1:YKMCv9b1WrfWmeqdV5MAuEHWsu5iC+fe6kYl2sQjdI8=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20190513172903-22d7a77e9e5f/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
|
||||
|
@ -379,9 +368,7 @@ golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91
|
|||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
|
||||
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190522155817-f3200d17e092/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
|
@ -406,7 +393,6 @@ golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
|
|||
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
|
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
|
@ -446,7 +432,6 @@ golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
|
|||
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
|
||||
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
|
@ -474,7 +459,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntN
|
|||
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
|
||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/warnings.v0 v0.1.2 h1:wFXVbFY8DY5/xOe1ECiWdKCzZlxgshcYVNkBHstARME=
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/url"
|
||||
"os"
|
||||
|
@ -73,11 +74,9 @@ func WriteFileAtomic(filename string, data []byte, perm os.FileMode) error {
|
|||
})
|
||||
}
|
||||
|
||||
func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error) {
|
||||
var (
|
||||
err error
|
||||
ost objectstorage.Storage
|
||||
)
|
||||
func NewObjectStorage(ctx context.Context, c *config.ObjectStorage) (objectstorage.ObjStorage, error) {
|
||||
var err error
|
||||
var ost objectstorage.ObjStorage
|
||||
|
||||
switch c.Type {
|
||||
case config.ObjectStorageTypePosix:
|
||||
|
@ -100,11 +99,11 @@ func NewObjectStorage(c *config.ObjectStorage) (*objectstorage.ObjStorage, error
|
|||
return nil, errors.Errorf("wrong s3 endpoint scheme %q (must be http or https)", u.Scheme)
|
||||
}
|
||||
}
|
||||
ost, err = objectstorage.NewS3(c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure)
|
||||
ost, err = objectstorage.NewS3(ctx, c.Bucket, c.Location, endpoint, c.AccessKey, c.SecretAccessKey, secure)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create s3 object storage")
|
||||
}
|
||||
}
|
||||
|
||||
return objectstorage.NewObjStorage(ost, "/"), nil
|
||||
return ost, nil
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package objectstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
|
@ -23,17 +24,17 @@ import (
|
|||
"agola.io/agola/internal/util"
|
||||
)
|
||||
|
||||
type Storage interface {
|
||||
Stat(filepath string) (*ObjectInfo, error)
|
||||
ReadObject(filepath string) (ReadSeekCloser, error)
|
||||
type ObjStorage interface {
|
||||
Stat(ctx context.Context, filepath string) (*ObjectInfo, error)
|
||||
ReadObject(ctx context.Context, filepath string) (ReadSeekCloser, error)
|
||||
// WriteObject atomically writes an object. If size is greater or equal to
|
||||
// zero then only size bytes will be read from data and wrote. If size is
|
||||
// less than zero data will be wrote until EOF. When persist is true the
|
||||
// implementation must ensure that data is persisted to the underlying
|
||||
// storage.
|
||||
WriteObject(filepath string, data io.Reader, size int64, persist bool) error
|
||||
DeleteObject(filepath string) error
|
||||
List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo
|
||||
WriteObject(ctx context.Context, filepath string, data io.Reader, size int64, persist bool) error
|
||||
DeleteObject(ctx context.Context, filepath string) error
|
||||
List(ctx context.Context, prefix, startAfter string, recursive bool) <-chan ObjectInfo
|
||||
}
|
||||
|
||||
type ErrNotExist struct {
|
||||
|
@ -64,26 +65,3 @@ type ObjectInfo struct {
|
|||
|
||||
Err error
|
||||
}
|
||||
|
||||
// ObjStorage wraps a Storage providing additional helper functions
|
||||
type ObjStorage struct {
|
||||
Storage
|
||||
delimiter string
|
||||
}
|
||||
|
||||
func NewObjStorage(s Storage, delimiter string) *ObjStorage {
|
||||
return &ObjStorage{Storage: s, delimiter: delimiter}
|
||||
}
|
||||
|
||||
func (s *ObjStorage) Delimiter() string {
|
||||
return s.delimiter
|
||||
}
|
||||
|
||||
func (s *ObjStorage) List(prefix, startWith string, recursive bool, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||
delimiter := s.delimiter
|
||||
if recursive {
|
||||
delimiter = ""
|
||||
}
|
||||
|
||||
return s.Storage.List(prefix, startWith, delimiter, doneCh)
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package objectstorage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -29,14 +30,10 @@ import (
|
|||
"agola.io/agola/internal/testutil"
|
||||
)
|
||||
|
||||
func setupPosix(t *testing.T, dir string) (*PosixStorage, error) {
|
||||
func setupPosix(dir string) (*PosixStorage, error) {
|
||||
return NewPosix(path.Join(dir, "posix"))
|
||||
}
|
||||
|
||||
func setupPosixFlat(t *testing.T, dir string) (*PosixFlatStorage, error) {
|
||||
return NewPosixFlat(path.Join(dir, "posixflat"))
|
||||
}
|
||||
|
||||
func setupS3(t *testing.T, dir string) (*S3Storage, error) {
|
||||
minioEndpoint := os.Getenv("MINIO_ENDPOINT")
|
||||
minioAccessKey := os.Getenv("MINIO_ACCESSKEY")
|
||||
|
@ -46,16 +43,13 @@ func setupS3(t *testing.T, dir string) (*S3Storage, error) {
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
return NewS3(filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false)
|
||||
return NewS3(context.Background(), filepath.Base(dir), "", minioEndpoint, minioAccessKey, minioSecretKey, false)
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
ps, err := setupPosix(t, dir)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
pfs, err := setupPosixFlat(t, dir)
|
||||
ps, err := setupPosix(dir)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
s3s, err := setupS3(t, dir)
|
||||
|
@ -68,31 +62,23 @@ func TestList(t *testing.T) {
|
|||
expected []string
|
||||
}
|
||||
tests := []struct {
|
||||
s map[string]Storage
|
||||
s map[string]ObjStorage
|
||||
objects []string
|
||||
ops []listop
|
||||
}{
|
||||
{
|
||||
map[string]Storage{"posixflat": pfs},
|
||||
map[string]ObjStorage{},
|
||||
[]string{
|
||||
// Minio (as of 20190201) IMHO is not real S3 since it tries to map to a
|
||||
// file system and not a flat namespace like S3. For this reason this test
|
||||
// won't work with minio beacuse it creates a file called "path/of" and so
|
||||
// won't work with minio because it creates a file called "path/of" and so
|
||||
// it's not possible to create a file "path/of/a" because it needs "of" to
|
||||
// be a directory
|
||||
// The same for the posix storage since it uses a posix filesystem.
|
||||
|
||||
// All of the below tests will fail on Minio due to the above reason and also the multiple '/'
|
||||
// so we aren't testing these with it
|
||||
// All of the below tests will fail on Minio and posix due to the above
|
||||
// reasons also due to the multiple '/' so we aren't testing these with them.
|
||||
|
||||
//"path/of",
|
||||
//"path/of/a/file02",
|
||||
//"path/of/a/file03",
|
||||
//"path/of/a/file04",
|
||||
//"path/of/a/file05",
|
||||
|
||||
// These are multiple of 8 chars on purpose to test the filemarker behavior to
|
||||
// distinguish between a file or a directory when the files ends at the path
|
||||
// separator point
|
||||
"s3/is/not/a/file///system/file01",
|
||||
"s3/is/not/a/file///system/file02",
|
||||
"s3/is/not/a/file///system/file03",
|
||||
|
@ -183,11 +169,8 @@ func TestList(t *testing.T) {
|
|||
},
|
||||
},
|
||||
{
|
||||
map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s},
|
||||
map[string]ObjStorage{"posix": ps, "minio": s3s},
|
||||
[]string{
|
||||
// These are multiple of 8 chars on purpose to test the filemarker behavior to
|
||||
// distinguish between a file or a directory when the files ends at the path
|
||||
// separator point
|
||||
"s3/is/not/a/file/sy/st/em/file01",
|
||||
"s3/is/not/a/file/sy/st/em/file02",
|
||||
"s3/is/not/a/file/sy/st/em/file03",
|
||||
|
@ -260,27 +243,26 @@ func TestList(t *testing.T) {
|
|||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
for sname, s := range tt.s {
|
||||
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
||||
switch s := s.(type) {
|
||||
for sname, os := range tt.s {
|
||||
t.Run(fmt.Sprintf("with storage type %s", sname), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
switch s := os.(type) {
|
||||
case *S3Storage:
|
||||
if s == nil {
|
||||
t.SkipNow()
|
||||
}
|
||||
}
|
||||
os := NewObjStorage(s, "/")
|
||||
|
||||
// populate
|
||||
for _, p := range tt.objects {
|
||||
if err := os.WriteObject(p, strings.NewReader(""), 0, true); err != nil {
|
||||
if err := os.WriteObject(ctx, p, strings.NewReader(""), 0, true); err != nil {
|
||||
t.Fatalf("%s %d err: %v", sname, i, err)
|
||||
}
|
||||
}
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
for j, op := range tt.ops {
|
||||
paths := []string{}
|
||||
for object := range os.List(op.prefix, op.start, op.recursive, doneCh) {
|
||||
for object := range os.List(ctx, op.prefix, op.start, op.recursive) {
|
||||
if object.Err != nil {
|
||||
t.Fatalf("%s %d-%d err: %v", sname, i, j, object.Err)
|
||||
return
|
||||
|
@ -299,10 +281,7 @@ func TestList(t *testing.T) {
|
|||
func TestWriteObject(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
|
||||
ps, err := setupPosix(t, dir)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
pfs, err := setupPosixFlat(t, dir)
|
||||
ps, err := setupPosix(dir)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
s3s, err := setupS3(t, dir)
|
||||
|
@ -316,25 +295,26 @@ func TestWriteObject(t *testing.T) {
|
|||
return bytes.NewBuffer(testBytes)
|
||||
}
|
||||
|
||||
for sname, s := range map[string]Storage{"posix": ps, "posixflat": pfs, "minio": s3s} {
|
||||
t.Run(fmt.Sprintf("test with storage type %s", sname), func(t *testing.T) {
|
||||
switch s := s.(type) {
|
||||
for sname, os := range map[string]ObjStorage{"posix": ps, "minio": s3s} {
|
||||
t.Run(fmt.Sprintf("with storage type %s", sname), func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
switch s := os.(type) {
|
||||
case *S3Storage:
|
||||
if s == nil {
|
||||
t.SkipNow()
|
||||
}
|
||||
}
|
||||
os := NewObjStorage(s, "/")
|
||||
|
||||
n := int64(10000)
|
||||
|
||||
// Test write without size. Should write whole buffer.
|
||||
buf := newBuf(n)
|
||||
objName := "obj01"
|
||||
err := os.WriteObject(objName, buf, -1, false)
|
||||
err := os.WriteObject(ctx, objName, buf, -1, false)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
oi, err := os.Stat(objName)
|
||||
oi, err := os.Stat(ctx, objName)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
assert.Equal(t, oi.Size, n, "expected object size")
|
||||
|
@ -342,10 +322,10 @@ func TestWriteObject(t *testing.T) {
|
|||
// Test write with object size equal to buf size.
|
||||
buf = newBuf(n)
|
||||
objName = "obj02"
|
||||
err = os.WriteObject(objName, buf, n, false)
|
||||
err = os.WriteObject(ctx, objName, buf, n, false)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
oi, err = os.Stat(objName)
|
||||
oi, err = os.Stat(ctx, objName)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
assert.Equal(t, oi.Size, n, "expected object size")
|
||||
|
@ -354,10 +334,10 @@ func TestWriteObject(t *testing.T) {
|
|||
buf = newBuf(n)
|
||||
objName = "obj03"
|
||||
size := int64(800)
|
||||
err = os.WriteObject(objName, buf, int64(size), false)
|
||||
err = os.WriteObject(ctx, objName, buf, int64(size), false)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
oi, err = os.Stat(objName)
|
||||
oi, err = os.Stat(ctx, objName)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
assert.Equal(t, oi.Size, size, "expected object size")
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
package objectstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
|
@ -22,6 +23,8 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/sorintlab/errors"
|
||||
|
||||
"agola.io/agola/internal/util"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -56,7 +59,7 @@ func (s *PosixStorage) fsPath(p string) (string, error) {
|
|||
return filepath.Join(s.dataDir, p), nil
|
||||
}
|
||||
|
||||
func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) {
|
||||
func (s *PosixStorage) Stat(ctx context.Context, p string) (*ObjectInfo, error) {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
@ -73,7 +76,7 @@ func (s *PosixStorage) Stat(p string) (*ObjectInfo, error) {
|
|||
return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
||||
}
|
||||
|
||||
func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
|
||||
func (s *PosixStorage) ReadObject(ctx context.Context, p string) (ReadSeekCloser, error) {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
@ -86,7 +89,7 @@ func (s *PosixStorage) ReadObject(p string) (ReadSeekCloser, error) {
|
|||
return f, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error {
|
||||
func (s *PosixStorage) WriteObject(ctx context.Context, p string, data io.Reader, size int64, persist bool) error {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -106,7 +109,7 @@ func (s *PosixStorage) WriteObject(p string, data io.Reader, size int64, persist
|
|||
})
|
||||
}
|
||||
|
||||
func (s *PosixStorage) DeleteObject(p string) error {
|
||||
func (s *PosixStorage) DeleteObject(ctx context.Context, p string) error {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
|
@ -148,21 +151,14 @@ func (s *PosixStorage) DeleteObject(p string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||
func (s *PosixStorage) List(ctx context.Context, prefix, startAfter string, recursive bool) <-chan ObjectInfo {
|
||||
objectCh := make(chan ObjectInfo, 1)
|
||||
|
||||
if len(delimiter) > 1 {
|
||||
objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
||||
if startAfter != "" && !strings.Contains(startAfter, prefix) {
|
||||
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startAfter value %q for prefix %q", startAfter, prefix)}
|
||||
return objectCh
|
||||
}
|
||||
|
||||
if startWith != "" && !strings.Contains(startWith, prefix) {
|
||||
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
||||
return objectCh
|
||||
}
|
||||
|
||||
recursive := delimiter == ""
|
||||
|
||||
// remove leading slash from prefix
|
||||
prefix = strings.TrimPrefix(prefix, "/")
|
||||
|
||||
|
@ -173,10 +169,18 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
|||
}
|
||||
|
||||
// remove leading slash
|
||||
startWith = strings.TrimPrefix(startWith, "/")
|
||||
startAfter = strings.TrimPrefix(startAfter, "/")
|
||||
|
||||
go func(objectCh chan<- ObjectInfo) {
|
||||
defer close(objectCh)
|
||||
defer func() {
|
||||
if util.ContextCanceled(ctx) {
|
||||
objectCh <- ObjectInfo{
|
||||
Err: ctx.Err(),
|
||||
}
|
||||
}
|
||||
close(objectCh)
|
||||
}()
|
||||
|
||||
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return errors.WithStack(err)
|
||||
|
@ -195,7 +199,7 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
|||
}
|
||||
if !recursive && len(p) > len(prefix) {
|
||||
rel := strings.TrimPrefix(p, prefix)
|
||||
skip := strings.Contains(rel, delimiter)
|
||||
skip := strings.Contains(rel, "/")
|
||||
|
||||
if info.IsDir() && skip {
|
||||
return filepath.SkipDir
|
||||
|
@ -209,21 +213,20 @@ func (s *PosixStorage) List(prefix, startWith, delimiter string, doneCh <-chan s
|
|||
return nil
|
||||
}
|
||||
|
||||
if strings.HasPrefix(p, prefix) && p > startWith {
|
||||
if strings.HasPrefix(p, prefix) && p > startAfter {
|
||||
select {
|
||||
// Send object content.
|
||||
case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
||||
// If receives done from the caller, return here.
|
||||
case <-doneCh:
|
||||
return io.EOF
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
objectCh <- ObjectInfo{
|
||||
Err: err,
|
||||
if err != nil {
|
||||
select {
|
||||
case objectCh <- ObjectInfo{Err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -16,6 +16,7 @@ package objectstorage
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
)
|
||||
|
||||
func TestPosixDeleteObject(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
objects := []string{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
||||
|
||||
dir := t.TempDir()
|
||||
|
@ -35,10 +37,10 @@ func TestPosixDeleteObject(t *testing.T) {
|
|||
testutil.NilError(t, err)
|
||||
|
||||
for _, obj := range objects {
|
||||
err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true)
|
||||
err := ls.WriteObject(ctx, obj, bytes.NewReader([]byte{}), 0, true)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
err = ls.DeleteObject(obj)
|
||||
err = ls.DeleteObject(ctx, obj)
|
||||
testutil.NilError(t, err)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,430 +0,0 @@
|
|||
// Copyright 2019 Sorint.lab
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package objectstorage
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"unicode/utf8"
|
||||
|
||||
"github.com/sorintlab/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
splitLength = 8
|
||||
)
|
||||
|
||||
func shouldEscape(c rune) bool {
|
||||
return c == '/' || c == '%'
|
||||
}
|
||||
|
||||
// escape does percent encoding to '/' and adds a slash every 8 (of the original
|
||||
// string) chars
|
||||
func escape(s string) string {
|
||||
sepCount, hexCount := 0, 0
|
||||
nc := 0
|
||||
for _, c := range s {
|
||||
nc++
|
||||
if shouldEscape(c) {
|
||||
hexCount++
|
||||
}
|
||||
if nc%splitLength == 0 {
|
||||
sepCount++
|
||||
}
|
||||
}
|
||||
|
||||
if sepCount == 0 && hexCount == 0 {
|
||||
return s
|
||||
}
|
||||
|
||||
hasFileMarker := nc%splitLength == 0
|
||||
l := len(s) + sepCount + 2*hexCount
|
||||
// if the string length is a multiple of 8 then we have to add a file marker
|
||||
// ".f" to not ovverride a possible directory in our fs representation
|
||||
if hasFileMarker {
|
||||
l++
|
||||
}
|
||||
|
||||
t := make([]byte, l)
|
||||
j := 0
|
||||
nc = 0
|
||||
for _, c := range s {
|
||||
nc++
|
||||
switch {
|
||||
case shouldEscape(c):
|
||||
t[j] = '%'
|
||||
t[j+1] = "0123456789ABCDEF"[c>>4]
|
||||
t[j+2] = "0123456789ABCDEF"[c&15]
|
||||
j += 3
|
||||
default:
|
||||
s := string(c)
|
||||
for i := 0; i < len(s); i++ {
|
||||
t[j] = s[i]
|
||||
j++
|
||||
}
|
||||
}
|
||||
if nc%splitLength == 0 {
|
||||
t[j] = '/'
|
||||
j++
|
||||
}
|
||||
}
|
||||
|
||||
// add file marker
|
||||
if hasFileMarker {
|
||||
t[j-1] = '.'
|
||||
t[j] = 'f'
|
||||
}
|
||||
|
||||
return string(t)
|
||||
}
|
||||
|
||||
func ishex(c byte) bool {
|
||||
switch {
|
||||
case '0' <= c && c <= '9':
|
||||
return true
|
||||
case 'a' <= c && c <= 'f':
|
||||
return true
|
||||
case 'A' <= c && c <= 'F':
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func unhex(c byte) byte {
|
||||
switch {
|
||||
case '0' <= c && c <= '9':
|
||||
return c - '0'
|
||||
case 'a' <= c && c <= 'f':
|
||||
return c - 'a' + 10
|
||||
case 'A' <= c && c <= 'F':
|
||||
return c - 'A' + 10
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type EscapeError string
|
||||
|
||||
func (e EscapeError) Error() string {
|
||||
return "invalid URL escape " + strconv.Quote(string(e))
|
||||
}
|
||||
|
||||
func unescape(s string) (string, bool, error) {
|
||||
// number of percent encoded
|
||||
n := 0
|
||||
// number of slashes
|
||||
ns := 0
|
||||
// number of char in the unescaped string
|
||||
nc := 0
|
||||
|
||||
for i := 0; i < len(s); {
|
||||
r, width := utf8.DecodeRuneInString(s[i:])
|
||||
if r == utf8.RuneError {
|
||||
return "", false, errors.Errorf("bad UTF-8 string")
|
||||
}
|
||||
switch r {
|
||||
case '%':
|
||||
n++
|
||||
if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
|
||||
s = s[i:]
|
||||
if len(s) > 3 {
|
||||
s = s[:3]
|
||||
}
|
||||
return "", false, EscapeError(s)
|
||||
}
|
||||
i += 3
|
||||
nc++
|
||||
case '/':
|
||||
ns++
|
||||
if nc%splitLength != 0 {
|
||||
return "", false, EscapeError(s)
|
||||
}
|
||||
i++
|
||||
default:
|
||||
i += width
|
||||
nc++
|
||||
}
|
||||
}
|
||||
|
||||
// check and remove trailing file marker
|
||||
hasFileMarker := false
|
||||
if nc > splitLength && nc%splitLength == 2 && s[len(s)-2:] == ".f" {
|
||||
hasFileMarker = true
|
||||
s = s[:len(s)-2]
|
||||
}
|
||||
|
||||
if n == 0 && ns == 0 {
|
||||
return s, hasFileMarker, nil
|
||||
}
|
||||
|
||||
// destination string is
|
||||
// the length of the escaped one (with the ending file marker already removed) - number of percent * 2 - number os slashes
|
||||
t := make([]byte, len(s)-n*2-ns)
|
||||
j := 0
|
||||
for i := 0; i < len(s); {
|
||||
r, width := utf8.DecodeRuneInString(s[i:])
|
||||
if r == utf8.RuneError {
|
||||
return "", false, errors.Errorf("bad UTF-8 string")
|
||||
}
|
||||
switch r {
|
||||
case '%':
|
||||
t[j] = unhex(s[i+1])<<4 | unhex(s[i+2])
|
||||
j++
|
||||
i += 3
|
||||
case '/':
|
||||
// skip "/"
|
||||
i++
|
||||
default:
|
||||
for k := 0; k < width; k++ {
|
||||
t[j] = s[i]
|
||||
j++
|
||||
i++
|
||||
}
|
||||
}
|
||||
}
|
||||
return string(t), hasFileMarker, nil
|
||||
}
|
||||
|
||||
type PosixFlatStorage struct {
|
||||
dataDir string
|
||||
tmpDir string
|
||||
}
|
||||
|
||||
func NewPosixFlat(baseDir string) (*PosixFlatStorage, error) {
|
||||
if err := os.MkdirAll(baseDir, 0770); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
dataDir := filepath.Join(baseDir, dataDirName)
|
||||
tmpDir := filepath.Join(baseDir, tmpDirName)
|
||||
if err := os.MkdirAll(dataDir, 0770); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create data dir")
|
||||
}
|
||||
if err := os.MkdirAll(tmpDir, 0770); err != nil {
|
||||
return nil, errors.Wrapf(err, "failed to create tmp dir")
|
||||
}
|
||||
return &PosixFlatStorage{
|
||||
dataDir: dataDir,
|
||||
tmpDir: tmpDir,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) fsPath(p string) (string, error) {
|
||||
if p == "" {
|
||||
return "", errors.Errorf("empty key name")
|
||||
}
|
||||
return filepath.Join(s.dataDir, escape(p)), nil
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) Stat(p string) (*ObjectInfo, error) {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
fi, err := os.Stat(fspath)
|
||||
if err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil, NewErrNotExist(err, "object %q doesn't exist", p)
|
||||
}
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return &ObjectInfo{Path: p, LastModified: fi.ModTime(), Size: fi.Size()}, nil
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) ReadObject(p string) (ReadSeekCloser, error) {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
f, err := os.Open(fspath)
|
||||
if err != nil && errors.Is(err, os.ErrNotExist) {
|
||||
return nil, NewErrNotExist(err, "object %q doesn't exist", p)
|
||||
}
|
||||
return f, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) WriteObject(p string, data io.Reader, size int64, persist bool) error {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(path.Dir(fspath), 0770); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
r := data
|
||||
if size >= 0 {
|
||||
r = io.LimitReader(data, size)
|
||||
}
|
||||
return writeFileAtomicFunc(fspath, s.dataDir, s.tmpDir, 0660, persist, func(f io.Writer) error {
|
||||
_, err := io.Copy(f, r)
|
||||
return errors.WithStack(err)
|
||||
})
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) DeleteObject(p string) error {
|
||||
fspath, err := s.fsPath(p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := os.Remove(fspath); err != nil {
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return NewErrNotExist(err, "object %q doesn't exist", p)
|
||||
}
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
// try to remove parent empty dirs
|
||||
// TODO(sgotti) if this fails we ignore errors and the dirs will be left as
|
||||
// empty, clean them asynchronously
|
||||
pdir := filepath.Dir(fspath)
|
||||
for {
|
||||
if pdir == s.dataDir || !strings.HasPrefix(pdir, s.dataDir) {
|
||||
break
|
||||
}
|
||||
f, err := os.Open(pdir)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err = f.Readdirnames(1)
|
||||
if errors.Is(err, io.EOF) {
|
||||
f.Close()
|
||||
if err := os.Remove(pdir); err != nil {
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
f.Close()
|
||||
break
|
||||
}
|
||||
|
||||
pdir = filepath.Dir(pdir)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *PosixFlatStorage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||
objectCh := make(chan ObjectInfo, 1)
|
||||
|
||||
if len(delimiter) > 1 {
|
||||
objectCh <- ObjectInfo{Err: errors.Errorf("wrong delimiter %q", delimiter)}
|
||||
return objectCh
|
||||
}
|
||||
|
||||
if startWith != "" && !strings.Contains(startWith, prefix) {
|
||||
objectCh <- ObjectInfo{Err: errors.Errorf("wrong startwith value %q for prefix %q", startWith, prefix)}
|
||||
return objectCh
|
||||
}
|
||||
|
||||
recursive := delimiter == ""
|
||||
|
||||
// remove leading slash from prefix
|
||||
prefix = strings.TrimPrefix(prefix, "/")
|
||||
|
||||
fprefix := filepath.Join(s.dataDir, escape(prefix))
|
||||
root := filepath.Dir(fprefix)
|
||||
if len(root) < len(s.dataDir) {
|
||||
root = s.dataDir
|
||||
}
|
||||
|
||||
// remove leading slash
|
||||
startWith = strings.TrimPrefix(startWith, "/")
|
||||
|
||||
go func(objectCh chan<- ObjectInfo) {
|
||||
var prevp string
|
||||
defer close(objectCh)
|
||||
err := filepath.Walk(root, func(ep string, info os.FileInfo, err error) error {
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
return nil
|
||||
}
|
||||
p := ep
|
||||
|
||||
// get the path with / separator
|
||||
p = filepath.ToSlash(p)
|
||||
|
||||
p, err = filepath.Rel(s.dataDir, p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
p, _, err = unescape(p)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if !recursive && len(p) > len(prefix) {
|
||||
rel := strings.TrimPrefix(p, prefix)
|
||||
skip := strings.Contains(rel, delimiter)
|
||||
|
||||
if info.IsDir() && skip {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
if skip {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// don't list dirs if there's not a file with the same name (with filemarker)
|
||||
// it's not an issue if the file in the meantime has been removed, it won't
|
||||
// just be listed
|
||||
hasFile := true
|
||||
_, err = os.Stat(ep + ".f")
|
||||
if err != nil && !errors.Is(err, os.ErrNotExist) {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
hasFile = false
|
||||
}
|
||||
if info.IsDir() && !hasFile {
|
||||
return nil
|
||||
}
|
||||
|
||||
if strings.HasPrefix(p, prefix) && p > startWith {
|
||||
// skip keys smaller than the previously returned one. This happens when we
|
||||
// receive a file with a file marker that we already returned previously
|
||||
// when we received a dir with the same name
|
||||
// it'not an issue if the dir has been removed since we already returned the file
|
||||
if p > prevp {
|
||||
select {
|
||||
// Send object content.
|
||||
case objectCh <- ObjectInfo{Path: p, LastModified: info.ModTime(), Size: info.Size()}:
|
||||
// If receives done from the caller, return here.
|
||||
case <-doneCh:
|
||||
return io.EOF
|
||||
}
|
||||
}
|
||||
prevp = p
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil && !errors.Is(err, io.EOF) {
|
||||
objectCh <- ObjectInfo{
|
||||
Err: err,
|
||||
}
|
||||
return
|
||||
}
|
||||
}(objectCh)
|
||||
|
||||
return objectCh
|
||||
}
|
|
@ -1,108 +0,0 @@
|
|||
// Copyright 2019 Sorint.lab
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package objectstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/sorintlab/errors"
|
||||
"gotest.tools/assert"
|
||||
|
||||
"agola.io/agola/internal/testutil"
|
||||
)
|
||||
|
||||
func TestEscapeUnescape(t *testing.T) {
|
||||
tests := []struct {
|
||||
in string
|
||||
expected string
|
||||
err error
|
||||
}{
|
||||
{"", "", nil},
|
||||
{"/", "%2F", nil},
|
||||
{"//", "%2F%2F", nil},
|
||||
{"☺", "☺", nil},
|
||||
{"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil},
|
||||
{"☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺.f", nil},
|
||||
{"☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺☺☺☺☺/☺☺☺☺☺☺☺☺.f", nil},
|
||||
{"☺☺☺☺a☺☺☺☺☺☺☺☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺☺☺☺☺☺.f", nil},
|
||||
{"☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "☺☺☺☺a☺☺☺/☺☺☺b☺☺☺☺.f", nil},
|
||||
{"⌘", "⌘", nil},
|
||||
{"⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘⌘", "⌘⌘⌘⌘⌘⌘⌘⌘/⌘⌘⌘", nil},
|
||||
|
||||
// These are 16 chars on purpose to test the filemarker behavior
|
||||
{"s3/is/not/a/file", "s3%2Fis%2Fno/t%2Fa%2Ffile.f", nil},
|
||||
{"s3/is/nota/file/", "s3%2Fis%2Fno/ta%2Ffile%2F.f", nil},
|
||||
{"s3/is/nota/files", "s3%2Fis%2Fno/ta%2Ffiles.f", nil},
|
||||
{"s3/is/nota/fil.f", "s3%2Fis%2Fno/ta%2Ffil.f.f", nil},
|
||||
|
||||
{"s3/is/nota/fil.fa", "s3%2Fis%2Fno/ta%2Ffil.f/a", nil},
|
||||
{"/s3/is/nota/fil.fa/", "%2Fs3%2Fis%2Fn/ota%2Ffil./fa%2F", nil},
|
||||
{"s3/is/not/a/file///system/fi%l%%e01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffi%25l%25%25/e01", nil},
|
||||
{"s3/is/not/a/file///system/file01", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01.f", nil},
|
||||
{"s3/is/not/a/file///system/file01/", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2F", nil},
|
||||
{"s3/is/not/a/file///system/file01/a", "s3%2Fis%2Fno/t%2Fa%2Ffile/%2F%2F%2Fsyste/m%2Ffile01/%2Fa", nil},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
out := escape(tt.in)
|
||||
if out != tt.expected {
|
||||
t.Fatalf("%d: escape: expected %q got %q", i, tt.expected, out)
|
||||
}
|
||||
|
||||
unescaped, _, err := unescape(out)
|
||||
if err != nil {
|
||||
if tt.err == nil {
|
||||
t.Fatalf("%d: unescape: expected no error got %v", i, err)
|
||||
} else if !errors.Is(tt.err, err) {
|
||||
t.Fatalf("%d: unescape: expected error %v got %v", i, tt.err, err)
|
||||
}
|
||||
} else {
|
||||
if tt.err != nil {
|
||||
t.Fatalf("%d: unescape: expected error %v got no error", i, tt.err)
|
||||
} else if unescaped != tt.in {
|
||||
t.Fatalf("%d: unescape: expected %q got %q", i, tt.in, unescaped)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPosixFlatDeleteObject(t *testing.T) {
|
||||
objects := []string{"/", "//", "☺☺☺☺a☺☺☺☺☺☺b☺☺☺☺", "s3/is/nota/fil.fa", "s3/is/not/a/file///system/fi%l%%e01"}
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
ls, err := NewPosixFlat(dir)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
for _, obj := range objects {
|
||||
err := ls.WriteObject(obj, bytes.NewReader([]byte{}), 0, true)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
err = ls.DeleteObject(obj)
|
||||
testutil.NilError(t, err)
|
||||
}
|
||||
|
||||
// no files and directories should be left
|
||||
bd, err := os.Open(filepath.Join(dir, dataDirName))
|
||||
testutil.NilError(t, err)
|
||||
|
||||
files, err := bd.Readdirnames(0)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
assert.Equal(t, len(files), 0)
|
||||
}
|
|
@ -15,13 +15,17 @@
|
|||
package objectstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
minio "github.com/minio/minio-go/v6"
|
||||
minio "github.com/minio/minio-go/v7"
|
||||
"github.com/minio/minio-go/v7/pkg/credentials"
|
||||
"github.com/sorintlab/errors"
|
||||
|
||||
"agola.io/agola/internal/util"
|
||||
)
|
||||
|
||||
type S3Storage struct {
|
||||
|
@ -31,23 +35,23 @@ type S3Storage struct {
|
|||
minioCore *minio.Core
|
||||
}
|
||||
|
||||
func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
|
||||
minioClient, err := minio.New(endpoint, accessKeyID, secretAccessKey, secure)
|
||||
func NewS3(ctx context.Context, bucket, location, endpoint, accessKeyID, secretAccessKey string, secure bool) (*S3Storage, error) {
|
||||
minioClient, err := minio.New(endpoint, &minio.Options{Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Secure: secure})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
minioCore, err := minio.NewCore(endpoint, accessKeyID, secretAccessKey, secure)
|
||||
minioCore, err := minio.NewCore(endpoint, &minio.Options{Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""), Secure: secure})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
exists, err := minioClient.BucketExists(bucket)
|
||||
exists, err := minioClient.BucketExists(ctx, bucket)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot check if bucket %q in location %q exits", bucket, location)
|
||||
}
|
||||
if !exists {
|
||||
if err := minioClient.MakeBucket(bucket, location); err != nil {
|
||||
if err := minioClient.MakeBucket(ctx, bucket, minio.MakeBucketOptions{Region: location}); err != nil {
|
||||
return nil, errors.Wrapf(err, "cannot create bucket %q in location %q", bucket, location)
|
||||
}
|
||||
}
|
||||
|
@ -59,8 +63,8 @@ func NewS3(bucket, location, endpoint, accessKeyID, secretAccessKey string, secu
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
|
||||
oi, err := s.minioClient.StatObject(s.bucket, p, minio.StatObjectOptions{})
|
||||
func (s *S3Storage) Stat(ctx context.Context, p string) (*ObjectInfo, error) {
|
||||
oi, err := s.minioClient.StatObject(ctx, s.bucket, p, minio.StatObjectOptions{})
|
||||
if err != nil {
|
||||
merr := minio.ToErrorResponse(err)
|
||||
if merr.StatusCode == http.StatusNotFound {
|
||||
|
@ -72,8 +76,8 @@ func (s *S3Storage) Stat(p string) (*ObjectInfo, error) {
|
|||
return &ObjectInfo{Path: p, LastModified: oi.LastModified, Size: oi.Size}, nil
|
||||
}
|
||||
|
||||
func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
|
||||
if _, err := s.minioClient.StatObject(s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
|
||||
func (s *S3Storage) ReadObject(ctx context.Context, filepath string) (ReadSeekCloser, error) {
|
||||
if _, err := s.minioClient.StatObject(ctx, s.bucket, filepath, minio.StatObjectOptions{}); err != nil {
|
||||
merr := minio.ToErrorResponse(err)
|
||||
if merr.StatusCode == http.StatusNotFound {
|
||||
return nil, NewErrNotExist(err, "object %q doesn't exist", filepath)
|
||||
|
@ -81,19 +85,19 @@ func (s *S3Storage) ReadObject(filepath string) (ReadSeekCloser, error) {
|
|||
return nil, errors.WithStack(merr)
|
||||
}
|
||||
|
||||
o, err := s.minioClient.GetObject(s.bucket, filepath, minio.GetObjectOptions{})
|
||||
o, err := s.minioClient.GetObject(ctx, s.bucket, filepath, minio.GetObjectOptions{})
|
||||
|
||||
return o, errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, persist bool) error {
|
||||
func (s *S3Storage) WriteObject(ctx context.Context, filepath string, data io.Reader, size int64, persist bool) error {
|
||||
// if size is not specified, limit max object size to defaultMaxObjectSize so
|
||||
// minio client will not calculate a very big part size using tons of ram.
|
||||
// An alternative is to write the file locally so we can calculate the size and
|
||||
// then put it. See commented out code below.
|
||||
if size >= 0 {
|
||||
lr := io.LimitReader(data, size)
|
||||
_, err := s.minioClient.PutObject(s.bucket, filepath, lr, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
||||
_, err := s.minioClient.PutObject(ctx, s.bucket, filepath, lr, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -112,61 +116,48 @@ func (s *S3Storage) WriteObject(filepath string, data io.Reader, size int64, per
|
|||
if _, err := tmpfile.Seek(0, 0); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
_, err = s.minioClient.PutObject(s.bucket, filepath, tmpfile, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
||||
_, err = s.minioClient.PutObject(ctx, s.bucket, filepath, tmpfile, size, minio.PutObjectOptions{ContentType: "application/octet-stream"})
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
func (s *S3Storage) DeleteObject(filepath string) error {
|
||||
return errors.WithStack(s.minioClient.RemoveObject(s.bucket, filepath))
|
||||
func (s *S3Storage) DeleteObject(ctx context.Context, filepath string) error {
|
||||
return errors.WithStack(s.minioClient.RemoveObject(ctx, s.bucket, filepath, minio.RemoveObjectOptions{}))
|
||||
}
|
||||
|
||||
func (s *S3Storage) List(prefix, startWith, delimiter string, doneCh <-chan struct{}) <-chan ObjectInfo {
|
||||
func (s *S3Storage) List(ctx context.Context, prefix, startAfter string, recursive bool) <-chan ObjectInfo {
|
||||
objectCh := make(chan ObjectInfo, 1)
|
||||
|
||||
if len(delimiter) > 1 {
|
||||
objectCh <- ObjectInfo{
|
||||
Err: errors.Errorf("wrong delimiter %q", delimiter),
|
||||
}
|
||||
return objectCh
|
||||
}
|
||||
|
||||
// remove leading slash
|
||||
prefix = strings.TrimPrefix(prefix, "/")
|
||||
startWith = strings.TrimPrefix(startWith, "/")
|
||||
startAfter = strings.TrimPrefix(startAfter, "/")
|
||||
|
||||
// Initiate list objects goroutine here.
|
||||
go func(objectCh chan<- ObjectInfo) {
|
||||
defer close(objectCh)
|
||||
// Save continuationToken for next request.
|
||||
var continuationToken string
|
||||
for {
|
||||
// Get list of objects a maximum of 1000 per request.
|
||||
result, err := s.minioCore.ListObjectsV2(s.bucket, prefix, continuationToken, false, delimiter, 1000, startWith)
|
||||
if err != nil {
|
||||
defer func() {
|
||||
if util.ContextCanceled(ctx) {
|
||||
objectCh <- ObjectInfo{
|
||||
Err: err,
|
||||
Err: ctx.Err(),
|
||||
}
|
||||
}
|
||||
close(objectCh)
|
||||
}()
|
||||
|
||||
for object := range s.minioClient.ListObjects(ctx, s.bucket, minio.ListObjectsOptions{Prefix: prefix, Recursive: recursive, StartAfter: startAfter}) {
|
||||
if object.Err != nil {
|
||||
select {
|
||||
case objectCh <- ObjectInfo{Err: object.Err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// If contents are available loop through and send over channel.
|
||||
for _, object := range result.Contents {
|
||||
select {
|
||||
// Send object content.
|
||||
case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}:
|
||||
// If receives done from the caller, return here.
|
||||
case <-doneCh:
|
||||
return
|
||||
}
|
||||
// minioClient.ListObject also returns common prefixes as objects, but we only want objects
|
||||
if strings.HasSuffix(object.Key, "/") {
|
||||
continue
|
||||
}
|
||||
|
||||
// If continuation token present, save it for next request.
|
||||
if result.NextContinuationToken != "" {
|
||||
continuationToken = result.NextContinuationToken
|
||||
}
|
||||
|
||||
// Listing ends result is not truncated, return right here.
|
||||
if !result.IsTruncated {
|
||||
select {
|
||||
case objectCh <- ObjectInfo{Path: object.Key, LastModified: object.LastModified, Size: object.Size}:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func (s *Configstore) maintenanceModeWatcher(ctx context.Context, runCtxCancel c
|
|||
type Configstore struct {
|
||||
log zerolog.Logger
|
||||
c *config.Configstore
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
d *db.DB
|
||||
lf lock.LockFactory
|
||||
ah *action.ActionHandler
|
||||
|
@ -89,7 +89,7 @@ func NewConfigstore(ctx context.Context, log zerolog.Logger, c *config.Configsto
|
|||
log = log.Level(zerolog.DebugLevel)
|
||||
}
|
||||
|
||||
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
|
||||
ost, err := scommon.NewObjectStorage(ctx, &c.ObjectStorage)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
|
|
@ -51,7 +51,7 @@ type Gateway struct {
|
|||
log zerolog.Logger
|
||||
c *config.Gateway
|
||||
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
runserviceClient *rsclient.Client
|
||||
configstoreClient *csclient.Client
|
||||
notificationClient *nsclient.Client
|
||||
|
@ -124,7 +124,7 @@ func NewGateway(ctx context.Context, log zerolog.Logger, gc *config.Config) (*Ga
|
|||
Key: c.CookieSigning.Key,
|
||||
})
|
||||
|
||||
ost, err := icommon.NewObjectStorage(&c.ObjectStorage)
|
||||
ost, err := icommon.NewObjectStorage(ctx, &c.ObjectStorage)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
|
|
@ -39,13 +39,13 @@ import (
|
|||
type ActionHandler struct {
|
||||
log zerolog.Logger
|
||||
d *db.DB
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
lf lock.LockFactory
|
||||
maintenanceMode bool
|
||||
maintenanceModeMutex sync.Mutex
|
||||
}
|
||||
|
||||
func NewActionHandler(log zerolog.Logger, d *db.DB, ost *objectstorage.ObjStorage, lf lock.LockFactory) *ActionHandler {
|
||||
func NewActionHandler(log zerolog.Logger, d *db.DB, ost objectstorage.ObjStorage, lf lock.LockFactory) *ActionHandler {
|
||||
return &ActionHandler{
|
||||
log: log,
|
||||
d: d,
|
||||
|
|
|
@ -87,10 +87,10 @@ func addHasMoreHeader(w http.ResponseWriter, hasMore bool) {
|
|||
type LogsHandler struct {
|
||||
log zerolog.Logger
|
||||
d *db.DB
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewLogsHandler(log zerolog.Logger, d *db.DB, ost *objectstorage.ObjStorage) *LogsHandler {
|
||||
func NewLogsHandler(log zerolog.Logger, d *db.DB, ost objectstorage.ObjStorage) *LogsHandler {
|
||||
return &LogsHandler{
|
||||
log: log,
|
||||
d: d,
|
||||
|
@ -192,7 +192,7 @@ func (h *LogsHandler) readTaskLogs(ctx context.Context, runID, taskID string, se
|
|||
} else {
|
||||
logPath = store.OSTRunTaskStepLogPath(task.ID, step)
|
||||
}
|
||||
f, err := h.ost.ReadObject(logPath)
|
||||
f, err := h.ost.ReadObject(ctx, logPath)
|
||||
if err != nil {
|
||||
if objectstorage.IsNotExist(err) {
|
||||
return true, util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -303,10 +303,10 @@ func sendLogs(w http.ResponseWriter, r io.Reader) error {
|
|||
type LogsDeleteHandler struct {
|
||||
log zerolog.Logger
|
||||
d *db.DB
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewLogsDeleteHandler(log zerolog.Logger, d *db.DB, ost *objectstorage.ObjStorage) *LogsDeleteHandler {
|
||||
func NewLogsDeleteHandler(log zerolog.Logger, d *db.DB, ost objectstorage.ObjStorage) *LogsDeleteHandler {
|
||||
return &LogsDeleteHandler{
|
||||
log: log,
|
||||
d: d,
|
||||
|
@ -404,7 +404,7 @@ func (h *LogsDeleteHandler) deleteTaskLogs(ctx context.Context, runID, taskID st
|
|||
} else {
|
||||
logPath = store.OSTRunTaskStepLogPath(task.ID, step)
|
||||
}
|
||||
err := h.ost.DeleteObject(logPath)
|
||||
err := h.ost.DeleteObject(ctx, logPath)
|
||||
if err != nil {
|
||||
if objectstorage.IsNotExist(err) {
|
||||
return util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -1005,10 +1005,10 @@ func (h *RunTaskActionsHandler) do(r *http.Request) error {
|
|||
type RunEventsHandler struct {
|
||||
log zerolog.Logger
|
||||
d *db.DB
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewRunEventsHandler(log zerolog.Logger, d *db.DB, ost *objectstorage.ObjStorage) *RunEventsHandler {
|
||||
func NewRunEventsHandler(log zerolog.Logger, d *db.DB, ost objectstorage.ObjStorage) *RunEventsHandler {
|
||||
return &RunEventsHandler{
|
||||
log: log,
|
||||
d: d,
|
||||
|
|
|
@ -360,10 +360,10 @@ func (h *ExecutorTasksHandler) do(r *http.Request) ([]*rsapitypes.ExecutorTask,
|
|||
|
||||
type ArchivesHandler struct {
|
||||
log zerolog.Logger
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewArchivesHandler(log zerolog.Logger, ost *objectstorage.ObjStorage) *ArchivesHandler {
|
||||
func NewArchivesHandler(log zerolog.Logger, ost objectstorage.ObjStorage) *ArchivesHandler {
|
||||
return &ArchivesHandler{
|
||||
log: log,
|
||||
ost: ost,
|
||||
|
@ -380,6 +380,7 @@ func (h *ArchivesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
func (h *ArchivesHandler) do(w http.ResponseWriter, r *http.Request) error {
|
||||
// TODO(sgotti) Check authorized call from executors
|
||||
ctx := r.Context()
|
||||
|
||||
taskID := r.URL.Query().Get("taskid")
|
||||
if taskID == "" {
|
||||
|
@ -396,7 +397,7 @@ func (h *ArchivesHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
if err := h.readArchive(taskID, step, w); err != nil {
|
||||
if err := h.readArchive(ctx, taskID, step, w); err != nil {
|
||||
switch {
|
||||
case util.APIErrorIs(err, util.ErrNotExist):
|
||||
return util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -408,9 +409,9 @@ func (h *ArchivesHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error {
|
||||
func (h *ArchivesHandler) readArchive(ctx context.Context, rtID string, step int, w io.Writer) error {
|
||||
archivePath := store.OSTRunTaskArchivePath(rtID, step)
|
||||
f, err := h.ost.ReadObject(archivePath)
|
||||
f, err := h.ost.ReadObject(ctx, archivePath)
|
||||
if err != nil {
|
||||
if objectstorage.IsNotExist(err) {
|
||||
return util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -427,10 +428,10 @@ func (h *ArchivesHandler) readArchive(rtID string, step int, w io.Writer) error
|
|||
|
||||
type CacheHandler struct {
|
||||
log zerolog.Logger
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewCacheHandler(log zerolog.Logger, ost *objectstorage.ObjStorage) *CacheHandler {
|
||||
func NewCacheHandler(log zerolog.Logger, ost objectstorage.ObjStorage) *CacheHandler {
|
||||
return &CacheHandler{
|
||||
log: log,
|
||||
ost: ost,
|
||||
|
@ -446,6 +447,7 @@ func (h *CacheHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h *CacheHandler) do(w http.ResponseWriter, r *http.Request) error {
|
||||
ctx := r.Context()
|
||||
vars := mux.Vars(r)
|
||||
// TODO(sgotti) Check authorized call from executors
|
||||
|
||||
|
@ -460,7 +462,7 @@ func (h *CacheHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
query := r.URL.Query()
|
||||
_, prefix := query["prefix"]
|
||||
|
||||
matchedKey, err := matchCache(h.ost, key, prefix)
|
||||
matchedKey, err := matchCache(ctx, h.ost, key, prefix)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -474,7 +476,7 @@ func (h *CacheHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
if err := h.readCache(matchedKey, w); err != nil {
|
||||
if err := h.readCache(ctx, matchedKey, w); err != nil {
|
||||
switch {
|
||||
case util.APIErrorIs(err, util.ErrNotExist):
|
||||
return util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -486,16 +488,13 @@ func (h *CacheHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string, error) {
|
||||
func matchCache(ctx context.Context, ost objectstorage.ObjStorage, key string, prefix bool) (string, error) {
|
||||
cachePath := store.OSTCachePath(key)
|
||||
|
||||
if prefix {
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
|
||||
// get the latest modified object
|
||||
var lastObject *objectstorage.ObjectInfo
|
||||
for object := range ost.List(store.OSTCacheDir()+"/"+key, "", false, doneCh) {
|
||||
for object := range ost.List(ctx, store.OSTCacheDir()+"/"+key, "", false) {
|
||||
if object.Err != nil {
|
||||
return "", errors.WithStack(object.Err)
|
||||
}
|
||||
|
@ -512,7 +511,7 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string,
|
|||
return store.OSTCacheKey(lastObject.Path), nil
|
||||
}
|
||||
|
||||
_, err := ost.Stat(cachePath)
|
||||
_, err := ost.Stat(ctx, cachePath)
|
||||
if objectstorage.IsNotExist(err) {
|
||||
return "", nil
|
||||
}
|
||||
|
@ -522,9 +521,9 @@ func matchCache(ost *objectstorage.ObjStorage, key string, prefix bool) (string,
|
|||
return key, nil
|
||||
}
|
||||
|
||||
func (h *CacheHandler) readCache(key string, w io.Writer) error {
|
||||
func (h *CacheHandler) readCache(ctx context.Context, key string, w io.Writer) error {
|
||||
cachePath := store.OSTCachePath(key)
|
||||
f, err := h.ost.ReadObject(cachePath)
|
||||
f, err := h.ost.ReadObject(ctx, cachePath)
|
||||
if err != nil {
|
||||
if objectstorage.IsNotExist(err) {
|
||||
return util.NewAPIErrorWrap(util.ErrNotExist, err)
|
||||
|
@ -541,10 +540,10 @@ func (h *CacheHandler) readCache(key string, w io.Writer) error {
|
|||
|
||||
type CacheCreateHandler struct {
|
||||
log zerolog.Logger
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
}
|
||||
|
||||
func NewCacheCreateHandler(log zerolog.Logger, ost *objectstorage.ObjStorage) *CacheCreateHandler {
|
||||
func NewCacheCreateHandler(log zerolog.Logger, ost objectstorage.ObjStorage) *CacheCreateHandler {
|
||||
return &CacheCreateHandler{
|
||||
log: log,
|
||||
ost: ost,
|
||||
|
@ -560,6 +559,7 @@ func (h *CacheCreateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
func (h *CacheCreateHandler) do(w http.ResponseWriter, r *http.Request) error {
|
||||
ctx := r.Context()
|
||||
vars := mux.Vars(r)
|
||||
// TODO(sgotti) Check authorized call from executors
|
||||
|
||||
|
@ -574,7 +574,7 @@ func (h *CacheCreateHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
|
||||
matchedKey, err := matchCache(h.ost, key, false)
|
||||
matchedKey, err := matchCache(ctx, h.ost, key, false)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ func (h *CacheCreateHandler) do(w http.ResponseWriter, r *http.Request) error {
|
|||
}
|
||||
|
||||
cachePath := store.OSTCachePath(key)
|
||||
if err := h.ost.WriteObject(cachePath, r.Body, size, false); err != nil {
|
||||
if err := h.ost.WriteObject(ctx, cachePath, r.Body, size, false); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
|
|
@ -76,7 +76,7 @@ func (s *Runservice) maintenanceModeWatcher(ctx context.Context, runCtxCancel co
|
|||
type Runservice struct {
|
||||
log zerolog.Logger
|
||||
c *config.Runservice
|
||||
ost *objectstorage.ObjStorage
|
||||
ost objectstorage.ObjStorage
|
||||
d *db.DB
|
||||
lf lock.LockFactory
|
||||
ah *action.ActionHandler
|
||||
|
@ -88,7 +88,7 @@ func NewRunservice(ctx context.Context, log zerolog.Logger, c *config.Runservice
|
|||
log = log.Level(zerolog.DebugLevel)
|
||||
}
|
||||
|
||||
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
|
||||
ost, err := scommon.NewObjectStorage(ctx, &c.ObjectStorage)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
|
|
@ -297,10 +297,10 @@ func TestLogleaner(t *testing.T) {
|
|||
body := io.NopCloser(bytes.NewBufferString("log test"))
|
||||
logPath := store.OSTRunTaskStepLogPath("task01", 0)
|
||||
|
||||
err := rs.ost.WriteObject(logPath, body, -1, false)
|
||||
err := rs.ost.WriteObject(ctx, logPath, body, -1, false)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
_, err = rs.ost.ReadObject(logPath)
|
||||
_, err = rs.ost.ReadObject(ctx, logPath)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
|
@ -308,7 +308,7 @@ func TestLogleaner(t *testing.T) {
|
|||
err = rs.objectsCleaner(ctx, store.OSTLogsBaseDir(), common.LogCleanerLockKey, 1*time.Millisecond)
|
||||
testutil.NilError(t, err)
|
||||
|
||||
_, err = rs.ost.ReadObject(logPath)
|
||||
_, err = rs.ost.ReadObject(ctx, logPath)
|
||||
assert.ErrorType(t, err, objectstorage.IsNotExist)
|
||||
}
|
||||
|
||||
|
|
|
@ -1014,8 +1014,8 @@ func (s *Runservice) runTasksUpdater(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Runservice) OSTFileExists(path string) (bool, error) {
|
||||
_, err := s.ost.Stat(path)
|
||||
func (s *Runservice) OSTFileExists(ctx context.Context, path string) (bool, error) {
|
||||
_, err := s.ost.Stat(ctx, path)
|
||||
if err != nil && !objectstorage.IsNotExist(err) {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
@ -1063,7 +1063,7 @@ func (s *Runservice) fetchLog(ctx context.Context, runID string, rt *types.RunTa
|
|||
} else {
|
||||
logPath = store.OSTRunTaskStepLogPath(rt.ID, stepnum)
|
||||
}
|
||||
ok, err := s.OSTFileExists(logPath)
|
||||
ok, err := s.OSTFileExists(ctx, logPath)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -1108,7 +1108,7 @@ func (s *Runservice) fetchLog(ctx context.Context, runID string, rt *types.RunTa
|
|||
}
|
||||
}
|
||||
|
||||
return errors.WithStack(s.ost.WriteObject(logPath, resp.Body, size, false))
|
||||
return errors.WithStack(s.ost.WriteObject(ctx, logPath, resp.Body, size, false))
|
||||
}
|
||||
|
||||
func (s *Runservice) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
|
||||
|
@ -1277,7 +1277,7 @@ func (s *Runservice) fetchArchive(ctx context.Context, runID string, rt *types.R
|
|||
}
|
||||
|
||||
path := store.OSTRunTaskArchivePath(rt.ID, stepnum)
|
||||
ok, err := s.OSTFileExists(path)
|
||||
ok, err := s.OSTFileExists(ctx, path)
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
@ -1319,7 +1319,7 @@ func (s *Runservice) fetchArchive(ctx context.Context, runID string, rt *types.R
|
|||
}
|
||||
}
|
||||
|
||||
return errors.WithStack(s.ost.WriteObject(path, resp.Body, size, false))
|
||||
return errors.WithStack(s.ost.WriteObject(ctx, path, resp.Body, size, false))
|
||||
}
|
||||
|
||||
func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {
|
||||
|
@ -1399,22 +1399,22 @@ func (s *Runservice) taskFetcher(ctx context.Context, r *types.Run, rt *types.Ru
|
|||
|
||||
// write related logs runID
|
||||
runIDPath := store.OSTRunTaskLogsRunPath(rt.ID, r.ID)
|
||||
exists, err := s.OSTFileExists(runIDPath)
|
||||
exists, err := s.OSTFileExists(ctx, runIDPath)
|
||||
if err != nil {
|
||||
s.log.Err(err).Send()
|
||||
} else if !exists {
|
||||
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||
if err := s.ost.WriteObject(ctx, runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||
s.log.Err(err).Send()
|
||||
}
|
||||
}
|
||||
|
||||
// write related archives runID
|
||||
runIDPath = store.OSTRunTaskArchivesRunPath(rt.ID, r.ID)
|
||||
exists, err = s.OSTFileExists(runIDPath)
|
||||
exists, err = s.OSTFileExists(ctx, runIDPath)
|
||||
if err != nil {
|
||||
s.log.Err(err).Send()
|
||||
} else if !exists {
|
||||
if err := s.ost.WriteObject(runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||
if err := s.ost.WriteObject(ctx, runIDPath, bytes.NewReader([]byte{}), 0, false); err != nil {
|
||||
s.log.Err(err).Send()
|
||||
}
|
||||
}
|
||||
|
@ -1604,14 +1604,12 @@ func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.
|
|||
}
|
||||
defer func() { _ = l.Unlock() }()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
for object := range s.ost.List(store.OSTCacheDir()+"/", "", true, doneCh) {
|
||||
for object := range s.ost.List(ctx, store.OSTCacheDir()+"/", "", true) {
|
||||
if object.Err != nil {
|
||||
return object.Err
|
||||
}
|
||||
if object.LastModified.Add(cacheExpireInterval).Before(time.Now()) {
|
||||
if err := s.ost.DeleteObject(object.Path); err != nil {
|
||||
if err := s.ost.DeleteObject(ctx, object.Path); err != nil {
|
||||
if !objectstorage.IsNotExist(err) {
|
||||
s.log.Warn().Err(err).Msgf("failed to delete cache object %q", object.Path)
|
||||
}
|
||||
|
@ -1663,14 +1661,12 @@ func (s *Runservice) objectsCleaner(ctx context.Context, prefix string, lockKey
|
|||
}
|
||||
defer func() { _ = l.Unlock() }()
|
||||
|
||||
doneCh := make(chan struct{})
|
||||
defer close(doneCh)
|
||||
for object := range s.ost.List(prefix+"/", "", true, doneCh) {
|
||||
for object := range s.ost.List(ctx, prefix+"/", "", true) {
|
||||
if object.Err != nil {
|
||||
return object.Err
|
||||
}
|
||||
if object.LastModified.Add(objectExpireInterval).Before(time.Now()) {
|
||||
if err := s.ost.DeleteObject(object.Path); err != nil {
|
||||
if err := s.ost.DeleteObject(ctx, object.Path); err != nil {
|
||||
if !objectstorage.IsNotExist(err) {
|
||||
s.log.Warn().Err(err).Msgf("failed to delete object %q", object.Path)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,13 @@
|
|||
package util
|
||||
|
||||
import "context"
|
||||
|
||||
// ContextCanceled returns whether a context is canceled.
|
||||
func ContextCanceled(ctx context.Context) bool {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue