storage.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict'
  19. const { Transform } = require('stream')
  20. const fs = require('fs')
  21. const debug = require('debug')('joystream:storage:storage')
  22. const Promise = require('bluebird')
  23. const Hash = require('ipfs-only-hash')
  24. Promise.config({
  25. cancellation: true,
  26. })
  27. const fileType = require('file-type')
  28. const ipfsClient = require('ipfs-http-client')
  29. const temp = require('temp').track()
  30. const _ = require('lodash')
  31. // Default request timeout; imposed on top of the IPFS client, because the
  32. // client doesn't seem to care.
  33. const DEFAULT_TIMEOUT = 30 * 1000
  34. // Default/dummy resolution implementation.
  35. const DEFAULT_RESOLVE_CONTENT_ID = async (original) => {
  36. debug('Warning: Default resolution returns original CID', original)
  37. return original
  38. }
  39. // Default file info if nothing could be detected.
  40. const DEFAULT_FILE_INFO = {
  41. mimeType: 'application/octet-stream',
  42. ext: 'bin',
  43. }
  44. /*
  45. * fileType is a weird name, because we're really looking at MIME types.
  46. * Also, the type field includes extension info, so we're going to call
  47. * it fileInfo { mimeType, ext } instead.
  48. * Nitpicking, but it also means we can add our default type if things
  49. * go wrong.
  50. */
  51. function fixFileInfo(info) {
  52. if (!info) {
  53. info = DEFAULT_FILE_INFO
  54. } else {
  55. info.mimeType = info.mime
  56. delete info.mime
  57. }
  58. return info
  59. }
  60. function fixFileInfoOnStream(stream) {
  61. const info = fixFileInfo(stream.fileType)
  62. delete stream.fileType
  63. stream.fileInfo = info
  64. return stream
  65. }
  66. /*
  67. * Internal Transform stream for helping write to a temporary location, adding
  68. * MIME type detection, and a commit() function.
  69. */
  70. class StorageWriteStream extends Transform {
  71. constructor(storage, options) {
  72. options = _.clone(options || {})
  73. super(options)
  74. this.storage = storage
  75. // Create temp target.
  76. this.temp = temp.createWriteStream()
  77. this.temp.on('error', (err) => this.emit('error', err))
  78. // Small temporary buffer storing first fileType.minimumBytes of stream
  79. // used for early file type detection
  80. this.buf = Buffer.alloc(0)
  81. }
  82. _transform(chunk, encoding, callback) {
  83. // Deal with buffers only
  84. if (typeof chunk === 'string') {
  85. chunk = Buffer.from(chunk)
  86. }
  87. // Try to detect file type during streaming.
  88. if (!this.fileInfo && this.buf.byteLength <= fileType.minimumBytes) {
  89. this.buf = Buffer.concat([this.buf, chunk])
  90. if (this.buf.byteLength >= fileType.minimumBytes) {
  91. const info = fileType(this.buf)
  92. // No info? We will try again at the end of the stream.
  93. if (info) {
  94. this.fileInfo = fixFileInfo(info)
  95. this.emit('fileInfo', this.fileInfo)
  96. }
  97. }
  98. }
  99. // Always waiting for write flush can be slow..
  100. // this.temp.write(chunk, (err) => {
  101. // callback(err)
  102. // })
  103. // Respect backpressure and handle write error
  104. if (!this.temp.write(chunk)) {
  105. this.temp.once('drain', () => callback(null))
  106. } else {
  107. process.nextTick(() => callback(null))
  108. }
  109. }
  110. _flush(callback) {
  111. debug('Flushing temporary stream:', this.temp.path)
  112. this.temp.end(() => {
  113. debug('flushed!')
  114. callback(null)
  115. this.emit('end')
  116. })
  117. }
  118. /*
  119. * Get file info
  120. */
  121. async info() {
  122. if (!this.temp) {
  123. throw new Error('Cannot get info on temporary stream that does not exist. Did you call cleanup()?')
  124. }
  125. if (!this.fileInfo) {
  126. const read = fs.createReadStream(this.temp.path)
  127. const stream = await fileType.stream(read)
  128. this.fileInfo = fixFileInfoOnStream(stream).fileInfo
  129. }
  130. if (!this.hash) {
  131. const read = fs.createReadStream(this.temp.path)
  132. this.hash = await Hash.of(read)
  133. }
  134. this.emit('info', this.fileInfo, this.hash)
  135. return {
  136. info: this.fileInfo,
  137. hash: this.hash,
  138. }
  139. }
  140. /*
  141. * Commit this stream to the IPFS backend.
  142. */
  143. commit() {
  144. if (!this.temp) {
  145. throw new Error('Cannot commit a temporary stream that does not exist. Did you call cleanup()?')
  146. }
  147. debug('Committing temporary stream: ', this.temp.path)
  148. this.storage.ipfs
  149. .addFromFs(this.temp.path)
  150. .then(async (result) => {
  151. const hash = result[0].hash
  152. debug('Stream committed as', hash)
  153. this.emit('committed', hash)
  154. await this.storage.ipfs.pin.add(hash)
  155. this.cleanup()
  156. })
  157. .catch((err) => {
  158. debug('Error committing stream', err)
  159. this.emit('error', err)
  160. this.cleanup()
  161. })
  162. }
  163. /*
  164. * Clean up temporary data.
  165. */
  166. cleanup() {
  167. // Make it safe to call cleanup more than once
  168. if (!this.temp) return
  169. debug('Cleaning up temporary file: ', this.temp.path)
  170. fs.unlink(this.temp.path, () => {
  171. /* Ignore errors. */
  172. })
  173. delete this.temp
  174. }
  175. }
  176. /*
  177. * Manages the storage backend interaction. This provides a Promise-based API.
  178. *
  179. * Usage:
  180. *
  181. * const store = await Storage.create({ ... });
  182. * store.open(...);
  183. */
  184. class Storage {
  185. /*
  186. * Create a Storage instance. Options include:
  187. *
  188. * - an `ipfs` property, which is itself a hash containing
  189. * - `connect_options` to be passed to the IPFS client library for
  190. * connecting to an IPFS node.
  191. * - a `resolve_content_id` function, which translates Joystream
  192. * content IDs to IPFS content IDs or vice versa. The default is to
  193. * not perform any translation, which is not practical for a production
  194. * system, but serves its function during development and testing. The
  195. * function must be asynchronous.
  196. * - a `timeout` parameter, defaulting to DEFAULT_TIMEOUT. After this time,
  197. * requests to the IPFS backend time out.
  198. *
  199. * Functions in this class accept an optional timeout parameter. If the
  200. * timeout is given, it is used - otherwise, the `option.timeout` value
  201. * above is used.
  202. */
  203. static create(options) {
  204. const storage = new Storage()
  205. storage._init(options)
  206. return storage
  207. }
  208. _init(options) {
  209. this.options = _.clone(options || {})
  210. this.options.ipfs = this.options.ipfs || {}
  211. this._timeout = this.options.timeout || DEFAULT_TIMEOUT
  212. this._resolve_content_id = this.options.resolve_content_id || DEFAULT_RESOLVE_CONTENT_ID
  213. this.ipfs = ipfsClient(this.options.ipfsHost || 'localhost', '5001', { protocol: 'http' })
  214. this.pinned = {}
  215. this.pinning = {}
  216. this.ipfs.id((err, identity) => {
  217. if (err) {
  218. debug(`Warning IPFS daemon not running: ${err.message}`)
  219. } else {
  220. debug(`IPFS node is up with identity: ${identity.id}`)
  221. // TODO: wait for IPFS daemon to be online for this to be effective..?
  222. // set the IPFS HTTP Gateway config we desire.. operator might need
  223. // to restart their daemon if the config was changed.
  224. this.ipfs.config.set('Gateway.PublicGateways', { 'localhost': null })
  225. }
  226. })
  227. }
  228. /*
  229. * Uses bluebird's timeout mechanism to return a Promise that times out after
  230. * the given timeout interval, and tries to execute the given operation within
  231. * that time.
  232. */
  233. async withSpecifiedTimeout(timeout, operation) {
  234. // TODO: rewrite this method to async-await style
  235. // eslint-disable-next-line no-async-promise-executor
  236. return new Promise(async (resolve, reject) => {
  237. try {
  238. resolve(await new Promise(operation))
  239. } catch (err) {
  240. reject(err)
  241. }
  242. }).timeout(timeout || this._timeout)
  243. }
  244. /*
  245. * Resolve content ID with timeout.
  246. */
  247. async resolveContentIdWithTimeout(timeout, contentId) {
  248. return await this.withSpecifiedTimeout(timeout, async (resolve, reject) => {
  249. try {
  250. resolve(await this._resolve_content_id(contentId))
  251. } catch (err) {
  252. reject(err)
  253. }
  254. })
  255. }
  256. /*
  257. * Stat a content ID.
  258. */
  259. async stat(contentId, timeout) {
  260. const ipfsHash = await this.resolveContentIdWithTimeout(timeout, contentId)
  261. return this.ipfsStat(ipfsHash, timeout)
  262. }
  263. /*
  264. * Stat IPFS hash
  265. */
  266. async ipfsStat(hash, timeout) {
  267. return this.withSpecifiedTimeout(timeout, (resolve, reject) => {
  268. this.ipfs.files.stat(`/ipfs/${hash}`, { withLocal: true }, (err, res) => {
  269. if (err) {
  270. reject(err)
  271. return
  272. }
  273. resolve(res)
  274. })
  275. })
  276. }
  277. /*
  278. * Return the size of a content ID.
  279. */
  280. async size(contentId, timeout) {
  281. const stat = await this.stat(contentId, timeout)
  282. return stat.size
  283. }
  284. /*
  285. * Opens the specified content in read or write mode, and returns a Promise
  286. * with the stream.
  287. *
  288. * Read streams will contain a fileInfo property, with:
  289. * - a `mimeType` field providing the file's MIME type, or a default.
  290. * - an `ext` property, providing a file extension suggestion, or a default.
  291. *
  292. * Write streams have a slightly different flow, in order to allow for MIME
  293. * type detection and potential filtering. First off, they are written to a
  294. * temporary location, and only committed to the backend once their
  295. * `commit()` function is called.
  296. *
  297. * When the commit has finished, a `committed` event is emitted, which
  298. * contains the IPFS backend's content ID.
  299. *
  300. * Write streams also emit a `fileInfo` event during writing. It is passed
  301. * the `fileInfo` field as described above. Event listeners may now opt to
  302. * abort the write or continue and eventually `commit()` the file. There is
  303. * an explicit `cleanup()` function that removes temporary files as well,
  304. * in case comitting is not desired.
  305. */
  306. async open(contentId, mode, timeout) {
  307. if (mode !== 'r' && mode !== 'w') {
  308. throw Error('The only supported modes are "r", "w" and "a".')
  309. }
  310. // Write stream
  311. if (mode === 'w') {
  312. return this.createWriteStream(contentId, timeout)
  313. }
  314. // Read stream - with file type detection
  315. return await this.createReadStream(contentId, timeout)
  316. }
  317. createWriteStream() {
  318. return new StorageWriteStream(this)
  319. }
  320. async createReadStream(contentId, timeout) {
  321. const ipfsHash = await this.resolveContentIdWithTimeout(timeout, contentId)
  322. let found = false
  323. return await this.withSpecifiedTimeout(timeout, (resolve, reject) => {
  324. const ls = this.ipfs.getReadableStream(ipfsHash)
  325. ls.on('data', async (result) => {
  326. if (result.path === ipfsHash) {
  327. found = true
  328. const ftStream = await fileType.stream(result.content)
  329. resolve(fixFileInfoOnStream(ftStream))
  330. }
  331. })
  332. ls.on('error', (err) => {
  333. ls.end()
  334. debug(err)
  335. reject(err)
  336. })
  337. ls.on('end', () => {
  338. if (!found) {
  339. const err = new Error('No matching content found for', contentId)
  340. debug(err)
  341. reject(err)
  342. }
  343. })
  344. ls.resume()
  345. })
  346. }
  347. /*
  348. * Pin the given IPFS CID
  349. */
  350. async pin(ipfsHash, callback) {
  351. if (!this.pinning[ipfsHash] && !this.pinned[ipfsHash]) {
  352. // debug(`Pinning hash: ${ipfsHash} content-id: ${contentId}`)
  353. this.pinning[ipfsHash] = true
  354. // Callback passed to add() will be called on error or when the entire file
  355. // is retrieved. So on success we consider the content synced.
  356. this.ipfs.pin.add(ipfsHash, { quiet: true, pin: true }, (err) => {
  357. delete this.pinning[ipfsHash]
  358. if (err) {
  359. debug(`Error Pinning: ${ipfsHash}`)
  360. callback && callback(err)
  361. } else {
  362. // debug(`Pinned ${ipfsHash}`)
  363. this.pinned[ipfsHash] = true
  364. callback && callback(null, this.syncStatus(ipfsHash))
  365. }
  366. })
  367. } else {
  368. callback && callback(null, this.syncStatus(ipfsHash))
  369. }
  370. }
  371. syncStatus(ipfsHash) {
  372. return {
  373. syncing: this.pinning[ipfsHash] === true,
  374. synced: this.pinned[ipfsHash] === true,
  375. }
  376. }
  377. }
  378. module.exports = {
  379. Storage,
  380. }