ranges.js 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. /*
  2. * This file is part of the storage node for the Joystream project.
  3. * Copyright (C) 2019 Joystream Contributors
  4. *
  5. * This program is free software: you can redistribute it and/or modify
  6. * it under the terms of the GNU General Public License as published by
  7. * the Free Software Foundation, either version 3 of the License, or
  8. * (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program. If not, see <https://www.gnu.org/licenses/>.
  17. */
  18. 'use strict'
  19. const uuid = require('uuid')
  20. const streamBuf = require('stream-buffers')
  21. const debug = require('debug')('joystream:util:ranges')
  22. /*
  23. * Range parsing
  24. */
  25. /*
  26. * Parse a range string, e.g. '0-100' or '-100' or '0-'. Return the values
  27. * in an array of int or undefined (if not provided).
  28. */
  29. function _parse_range(range) {
  30. const matches = range.match(/^(\d+-\d+|\d+-|-\d+|\*)$/u)
  31. if (!matches) {
  32. throw new Error(`Not a valid range: ${range}`)
  33. }
  34. const vals = matches[1].split('-').map((v) => {
  35. return v === '*' || v === '' ? undefined : parseInt(v, 10)
  36. })
  37. if (vals[1] <= vals[0]) {
  38. throw new Error(`Invalid range: start "${vals[0]}" must be before end "${vals[1]}".`)
  39. }
  40. return [vals[0], vals[1]]
  41. }
  42. /*
  43. * Parse a range header value, e.g. unit=ranges, where ranges
  44. * are a comma separated list of individual ranges, and unit is any
  45. * custom unit string. If the unit (and equal sign) are not given, assume
  46. * 'bytes'.
  47. */
  48. function parse(rangeStr) {
  49. const res = {}
  50. debug('Parse range header value:', rangeStr)
  51. const matches = rangeStr.match(/^(([^\s]+)=)?((?:(?:\d+-\d+|-\d+|\d+-),?)+)$/u)
  52. if (!matches) {
  53. throw new Error(`Not a valid range header: ${rangeStr}`)
  54. }
  55. res.unit = matches[2] || 'bytes'
  56. res.rangeStr = matches[3]
  57. res.ranges = []
  58. // Parse individual ranges
  59. const ranges = []
  60. res.rangeStr.split(',').forEach((range) => {
  61. ranges.push(_parse_range(range))
  62. })
  63. // Merge ranges into result.
  64. ranges.forEach((newRange) => {
  65. debug('Found range:', newRange)
  66. let isMerged = false
  67. for (const i in res.ranges) {
  68. const old_range = res.ranges[i]
  69. // Skip if the new range is fully separate from the old range.
  70. if (old_range[1] + 1 < newRange[0] || newRange[1] + 1 < old_range[0]) {
  71. debug('Range does not overlap with', old_range)
  72. continue
  73. }
  74. // If we know they're adjacent or overlapping, we construct the
  75. // merged range from the lower start and the higher end of both
  76. // ranges.
  77. const merged = [Math.min(old_range[0], newRange[0]), Math.max(old_range[1], newRange[1])]
  78. res.ranges[i] = merged
  79. isMerged = true
  80. debug('Merged', newRange, 'into', old_range, 'as', merged)
  81. }
  82. if (!isMerged) {
  83. debug('Non-overlapping range!')
  84. res.ranges.push(newRange)
  85. }
  86. })
  87. // Finally, sort ranges
  88. res.ranges.sort((first, second) => {
  89. if (first[0] === second[0]) {
  90. // Should not happen due to merging.
  91. return 0
  92. }
  93. return first[0] < second[0] ? -1 : 1
  94. })
  95. debug('Result of parse is', res)
  96. return res
  97. }
  98. /*
  99. * Async version of parse().
  100. */
  101. function parseAsync(rangeStr, cb) {
  102. try {
  103. return cb(parse(rangeStr))
  104. } catch (err) {
  105. return cb(null, err)
  106. }
  107. }
  108. /*
  109. * Range streaming
  110. */
  111. /*
  112. * The class writes parts specified in the options to the response. If no ranges
  113. * are specified, the entire stream is written. At the end, the given callback
  114. * is invoked - if an error occurred, it is invoked with an error parameter.
  115. *
  116. * Note that the range implementation can be optimized for streams that support
  117. * seeking.
  118. *
  119. * There's another optimization here for when sizes are given, which is possible
  120. * with file system based streams. We'll see how likely that's going to be in
  121. * future.
  122. */
  123. class RangeSender {
  124. constructor(response, stream, opts, end_callback) {
  125. // Options
  126. this.name = opts.name || 'content.bin'
  127. this.type = opts.type || 'application/octet-stream'
  128. this.size = opts.size
  129. this.ranges = opts.ranges
  130. this.download = opts.download || false
  131. // Range handling related state.
  132. this.read_offset = 0 // Nothing read so far
  133. this.rangeIndex = -1 // No range index yet.
  134. this.range_boundary = undefined // Generate boundary when needed.
  135. // Event handlers & state
  136. this.handlers = {}
  137. this.opened = false
  138. debug('RangeSender:', this)
  139. if (opts.ranges) {
  140. debug('Parsed ranges:', opts.ranges.ranges)
  141. }
  142. // Parameters
  143. this.response = response
  144. this.stream = stream
  145. this.opts = opts
  146. this.end_callback = end_callback
  147. }
  148. on_error(err) {
  149. // Assume hiding the actual error is best, and default to 404.
  150. debug('Error:', err)
  151. if (!this.response.headersSent) {
  152. this.response.status(err.code || 404).send({
  153. message: err.message || `File not found: ${this.name}`,
  154. })
  155. }
  156. if (this.end_callback) {
  157. this.end_callback(err)
  158. }
  159. }
  160. on_end() {
  161. debug('End of stream.')
  162. this.response.end()
  163. if (this.end_callback) {
  164. this.end_callback()
  165. }
  166. }
  167. // **** No ranges
  168. on_open_no_range() {
  169. // File got opened, so we can set headers/status
  170. debug('Open succeeded:', this.name, this.type)
  171. this.opened = true
  172. this.response.status(200)
  173. this.response.contentType(this.type)
  174. this.response.header('Accept-Ranges', 'bytes')
  175. this.response.header('Content-Transfer-Encoding', 'binary')
  176. if (this.download) {
  177. this.response.header('Content-Disposition', `attachment; filename="${this.name}"`)
  178. } else {
  179. this.response.header('Content-Disposition', 'inline')
  180. }
  181. if (this.size) {
  182. this.response.header('Content-Length', this.size)
  183. }
  184. }
  185. on_data_no_range(chunk) {
  186. if (!this.opened) {
  187. this.handlers.open()
  188. }
  189. // As simple as it can be.
  190. this.response.write(Buffer.from(chunk, 'binary'))
  191. }
  192. // *** With ranges
  193. nextRangeHeaders() {
  194. // Next range
  195. this.rangeIndex += 1
  196. if (this.rangeIndex >= this.ranges.ranges.length) {
  197. debug('Cannot advance range index; we are done.')
  198. return undefined
  199. }
  200. // Calculate this range's size.
  201. const range = this.ranges.ranges[this.rangeIndex]
  202. let totalSize
  203. if (this.size) {
  204. totalSize = this.size
  205. }
  206. if (typeof range[0] === 'undefined') {
  207. range[0] = 0
  208. }
  209. if (typeof range[1] === 'undefined') {
  210. if (this.size) {
  211. range[1] = totalSize - 1
  212. }
  213. }
  214. let send_size
  215. if (typeof range[0] !== 'undefined' && typeof range[1] !== 'undefined') {
  216. send_size = range[1] - range[0] + 1
  217. }
  218. // Write headers, but since we may be in a multipart situation, write them
  219. // explicitly to the stream.
  220. const start = typeof range[0] === 'undefined' ? '' : `${range[0]}`
  221. const end = typeof range[1] === 'undefined' ? '' : `${range[1]}`
  222. let size_str
  223. if (totalSize) {
  224. size_str = `${totalSize}`
  225. } else {
  226. size_str = '*'
  227. }
  228. const ret = {
  229. 'Content-Range': `bytes ${start}-${end}/${size_str}`,
  230. 'Content-Type': `${this.type}`,
  231. }
  232. if (send_size) {
  233. ret['Content-Length'] = `${send_size}`
  234. }
  235. return ret
  236. }
  237. next_range() {
  238. if (this.ranges.ranges.length == 1) {
  239. debug('Cannot start new range; only one requested.')
  240. this.stream.off('data', this.handlers.data)
  241. return false
  242. }
  243. const headers = this.nextRangeHeaders()
  244. if (headers) {
  245. const header_buf = new streamBuf.WritableStreamBuffer()
  246. // We start a range with a boundary.
  247. header_buf.write(`\r\n--${this.range_boundary}\r\n`)
  248. // The we write the range headers.
  249. for (const header in headers) {
  250. header_buf.write(`${header}: ${headers[header]}\r\n`)
  251. }
  252. header_buf.write('\r\n')
  253. this.response.write(header_buf.getContents())
  254. debug('New range started.')
  255. return true
  256. }
  257. // No headers means we're finishing the last range.
  258. this.response.write(`\r\n--${this.range_boundary}--\r\n`)
  259. debug('End of ranges sent.')
  260. this.stream.off('data', this.handlers.data)
  261. return false
  262. }
  263. on_open_ranges() {
  264. // File got opened, so we can set headers/status
  265. debug('Open succeeded:', this.name, this.type)
  266. this.opened = true
  267. this.response.header('Accept-Ranges', 'bytes')
  268. this.response.header('Content-Transfer-Encoding', 'binary')
  269. this.response.header('Content-Disposition', 'inline')
  270. // For single ranges, the content length should be the size of the
  271. // range. For multiple ranges, we don't send a content length
  272. // header.
  273. //
  274. // Similarly, the type is different whether or not there is more than
  275. // one range.
  276. if (this.ranges.ranges.length == 1) {
  277. this.response.writeHead(206, 'Partial Content', this.nextRangeHeaders())
  278. } else {
  279. this.range_boundary = uuid.v4()
  280. const headers = {
  281. 'Content-Type': `multipart/byteranges; boundary=${this.range_boundary}`,
  282. }
  283. this.response.writeHead(206, 'Partial Content', headers)
  284. this.next_range()
  285. }
  286. }
  287. on_data_ranges(chunk) {
  288. if (!this.opened) {
  289. this.handlers.open()
  290. }
  291. // Crap, node.js streams are stupid. No guarantee for seek support. Sure,
  292. // that makes node.js easier to implement, but offloads everything onto the
  293. // application developer.
  294. //
  295. // So, we skip chunks until our read position is within the range we want to
  296. // send at the moment. We're relying on ranges being in-order, which this
  297. // file's parser luckily (?) provides.
  298. //
  299. // The simplest optimization would be at ever range start to seek() to the
  300. // start.
  301. const chunk_range = [this.read_offset, this.read_offset + chunk.length - 1]
  302. debug('= Got chunk with byte range', chunk_range)
  303. while (true) {
  304. let req_range = this.ranges.ranges[this.rangeIndex]
  305. if (!req_range) {
  306. break
  307. }
  308. debug('Current requested range is', req_range)
  309. if (!req_range[1]) {
  310. req_range = [req_range[0], Number.MAX_SAFE_INTEGER]
  311. debug('Treating as', req_range)
  312. }
  313. // No overlap in the chunk and requested range; don't write.
  314. if (chunk_range[1] < req_range[0] || chunk_range[0] > req_range[1]) {
  315. debug('Ignoring chunk; it is out of range.')
  316. break
  317. }
  318. // Since there is overlap, find the segment that's entirely within the
  319. // chunk.
  320. const segment = [Math.max(chunk_range[0], req_range[0]), Math.min(chunk_range[1], req_range[1])]
  321. debug('Segment to send within chunk is', segment)
  322. // Normalize the segment to a chunk offset
  323. const start = segment[0] - this.read_offset
  324. const end = segment[1] - this.read_offset
  325. const len = end - start + 1
  326. debug('Offsets into buffer are', [start, end], 'with length', len)
  327. // Write the slice that we want to write. We first create a buffer from the
  328. // chunk. Then we slice a new buffer from the same underlying ArrayBuffer,
  329. // starting at the original buffer's offset, further offset by the segment
  330. // start. The segment length bounds the end of our slice.
  331. const buf = Buffer.from(chunk, 'binary')
  332. this.response.write(Buffer.from(buf.buffer, buf.byteOffset + start, len))
  333. // If the requested range is finished, we should start the next one.
  334. if (req_range[1] > chunk_range[1]) {
  335. debug('Chunk is finished, but the requested range is missing bytes.')
  336. break
  337. }
  338. if (req_range[1] <= chunk_range[1]) {
  339. debug('Range is finished.')
  340. if (!this.next_range(segment)) {
  341. break
  342. }
  343. }
  344. }
  345. // Update read offset when chunk is finished.
  346. this.read_offset += chunk.length
  347. }
  348. start() {
  349. // Before we start streaming, let's ensure our ranges don't contain any
  350. // without start - if they do, we nuke them all and treat this as a full
  351. // request.
  352. let nuke = false
  353. if (this.ranges) {
  354. for (const i in this.ranges.ranges) {
  355. if (typeof this.ranges.ranges[i][0] === 'undefined') {
  356. nuke = true
  357. break
  358. }
  359. }
  360. }
  361. if (nuke) {
  362. this.ranges = undefined
  363. }
  364. // Register callbacks. Store them in a handlers object so we can
  365. // keep the bound version around for stopping to listen to events.
  366. this.handlers.error = this.on_error.bind(this)
  367. this.handlers.end = this.on_end.bind(this)
  368. if (this.ranges) {
  369. debug('Preparing to handle ranges.')
  370. this.handlers.open = this.on_open_ranges.bind(this)
  371. this.handlers.data = this.on_data_ranges.bind(this)
  372. } else {
  373. debug('No ranges, just send the whole file.')
  374. this.handlers.open = this.on_open_no_range.bind(this)
  375. this.handlers.data = this.on_data_no_range.bind(this)
  376. }
  377. for (const handler in this.handlers) {
  378. this.stream.on(handler, this.handlers[handler])
  379. }
  380. }
  381. }
  382. function send(response, stream, opts, end_callback) {
  383. const sender = new RangeSender(response, stream, opts, end_callback)
  384. sender.start()
  385. }
  386. /*
  387. * Exports
  388. */
  389. module.exports = {
  390. parse,
  391. parseAsync,
  392. RangeSender,
  393. send,
  394. }