Browse Source

Merge pull request #2476 from mnaamani/storage-node/fix-write-stream

storage-node: ensure write stream flushed correctly
shamil-gadelshin 3 years ago
parent
commit
b8ef965c29

+ 1 - 1
.github/workflows/run-network-tests.yml

@@ -193,7 +193,7 @@ jobs:
           docker-compose up -d joystream-node
       - name: Configure and start development storage node
         run: |
-          DEBUG=* yarn storage-cli dev-init
+          DEBUG=joystream:* yarn storage-cli dev-init
           docker-compose up -d colossus
       - name: Test uploading
         run: |

+ 10 - 3
storage-node/packages/colossus/paths/asset/v0/{id}.js

@@ -25,7 +25,10 @@ const assert = require('assert')
 
 function errorHandler(response, err, code) {
   debug(err)
-  response.status(err.code || code || 500).send({ message: err.toString() })
+  // Some err types don't have a valid http status code such as one that come from ipfs node for example
+  const statusCode = typeof err.code === 'number' ? err.code : code
+  response.status(statusCode || 500).send({ message: err.toString() })
+  response.end()
 }
 
 // The maximum total estimated balance that will be spent submitting transactions
@@ -143,7 +146,7 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           }
         })
 
-        stream.on('finish', async () => {
+        stream.on('end', async () => {
           if (!aborted) {
             try {
               // try to get file info and compute ipfs hash before committing the stream to ifps node.
@@ -206,7 +209,11 @@ module.exports = function (storage, runtime, ipfsHttpGatewayUrl, anonymous) {
           }
         })
 
-        stream.on('error', (err) => errorHandler(res, err))
+        stream.on('error', (err) => {
+          stream.end()
+          stream.cleanup()
+          errorHandler(res, err)
+        })
         req.pipe(stream)
       } catch (err) {
         errorHandler(res, err)

+ 26 - 16
storage-node/packages/storage/storage.js

@@ -90,6 +90,10 @@ class StorageWriteStream extends Transform {
 
     // Create temp target.
     this.temp = temp.createWriteStream()
+    this.temp.on('error', (err) => this.emit('error', err))
+
+    // Small temporary buffer storing first fileType.minimumBytes of stream
+    // used for early file type detection
     this.buf = Buffer.alloc(0)
   }
 
@@ -99,13 +103,11 @@ class StorageWriteStream extends Transform {
       chunk = Buffer.from(chunk)
     }
 
-    this.temp.write(chunk)
-
     // Try to detect file type during streaming.
     if (!this.fileInfo && this.buf.byteLength <= fileType.minimumBytes) {
       this.buf = Buffer.concat([this.buf, chunk])
 
-      if (this.buf >= fileType.minimumBytes) {
+      if (this.buf.byteLength >= fileType.minimumBytes) {
         const info = fileType(this.buf)
         // No info? We will try again at the end of the stream.
         if (info) {
@@ -115,13 +117,26 @@ class StorageWriteStream extends Transform {
       }
     }
 
-    callback(null)
+    // Always waiting for write flush can be slow..
+    // this.temp.write(chunk, (err) => {
+    //   callback(err)
+    // })
+
+    // Respect backpressure and handle write error
+    if (!this.temp.write(chunk)) {
+      this.temp.once('drain', () => callback(null))
+    } else {
+      process.nextTick(() => callback(null))
+    }
   }
 
   _flush(callback) {
     debug('Flushing temporary stream:', this.temp.path)
-    this.temp.end()
-    callback(null)
+    this.temp.end(() => {
+      debug('flushed!')
+      callback(null)
+      this.emit('end')
+    })
   }
 
   /*
@@ -183,6 +198,8 @@ class StorageWriteStream extends Transform {
    * Clean up temporary data.
    */
   cleanup() {
+    // Make it safe to call cleanup more than once
+    if (!this.temp) return
     debug('Cleaning up temporary file: ', this.temp.path)
     fs.unlink(this.temp.path, () => {
       /* Ignore errors. */
@@ -333,22 +350,15 @@ class Storage {
 
     // Write stream
     if (mode === 'w') {
-      return await this.createWriteStream(contentId, timeout)
+      return this.createWriteStream(contentId, timeout)
     }
 
     // Read stream - with file type detection
     return await this.createReadStream(contentId, timeout)
   }
 
-  async createWriteStream() {
-    // IPFS wants us to just dump a stream into its storage, then returns a
-    // content ID (of its own).
-    // We need to instead return a stream immediately, that we eventually
-    // decorate with the content ID when that's available.
-    return new Promise((resolve) => {
-      const stream = new StorageWriteStream(this)
-      resolve(stream)
-    })
+  createWriteStream() {
+    return new StorageWriteStream(this)
   }
 
   async createReadStream(contentId, timeout) {

+ 39 - 34
storage-node/packages/storage/test/storage.js

@@ -34,7 +34,7 @@ function write(store, contentId, contents, callback) {
   store
     .open(contentId, 'w')
     .then((stream) => {
-      stream.on('finish', () => {
+      stream.on('end', () => {
         stream.commit()
       })
       stream.on('committed', callback)
@@ -90,39 +90,44 @@ describe('storage/storage', () => {
       })
     })
 
-    // it('detects the MIME type of a write stream', (done) => {
-    // 	const contents = fs.readFileSync('../../storage-node_new.svg')
-    // 	storage
-    // 		.open('mime-test', 'w')
-    // 		.then((stream) => {
-    // 			let fileInfo
-    // 			stream.on('fileInfo', (info) => {
-    // 				// Could filter & abort here now, but we're just going to set this,
-    // 				// and expect it to be set later...
-    // 				fileInfo = info
-    // 			})
-    //
-    // 			stream.on('finish', () => {
-    // 				stream.commit()
-    // 			})
-    //
-    // 			stream.on('committed', () => {
-    // 				// ... if fileInfo is not set here, there's an issue.
-    // 				expect(fileInfo).to.have.property('mimeType', 'application/xml')
-    // 				expect(fileInfo).to.have.property('ext', 'xml')
-    // 				done()
-    // 			})
-    //
-    // 			if (!stream.write(contents)) {
-    // 				stream.once('drain', () => stream.end())
-    // 			} else {
-    // 				process.nextTick(() => stream.end())
-    // 			}
-    // 		})
-    // 		.catch((err) => {
-    // 			expect.fail(err)
-    // 		})
-    // })
+    it('detects the MIME type of a write stream', (done) => {
+      const contents = fs.readFileSync('../../storage-node_new.svg')
+      storage
+        .open('mime-test', 'w')
+        .then((stream) => {
+          let fileInfo
+          stream.on('fileInfo', (info) => {
+            // Could filter & abort here now, but we're just going to set this,
+            // and expect it to be set later...
+            fileInfo = info
+          })
+
+          stream.on('end', () => {
+            stream.info()
+          })
+
+          stream.once('info', async (info) => {
+            fileInfo = info
+            stream.commit()
+          })
+
+          stream.on('committed', () => {
+            // ... if fileInfo is not set here, there's an issue.
+            expect(fileInfo).to.have.property('mimeType', 'application/xml')
+            expect(fileInfo).to.have.property('ext', 'xml')
+            done()
+          })
+
+          if (!stream.write(contents)) {
+            stream.once('drain', () => stream.end())
+          } else {
+            process.nextTick(() => stream.end())
+          }
+        })
+        .catch((err) => {
+          expect.fail(err)
+        })
+    })
 
     it('can read a stream', (done) => {
       const contents = 'test-for-reading'