mirror of https://github.com/google/brotli
Update (#569)
* add misssing fclose in `brotli.c` * add basic tests for python `Decompressor` type * minor lint fixes in `_brotli.cc`
This commit is contained in:
parent
58f5c37f3b
commit
1becbbf231
|
@ -553,12 +553,14 @@ static BROTLI_BOOL ReadDictionary(Context* context) {
|
|||
if (file_size_64 == -1) {
|
||||
fprintf(stderr, "could not get size of dictionary file [%s]",
|
||||
PrintablePath(context->dictionary_path));
|
||||
fclose(f);
|
||||
return BROTLI_FALSE;
|
||||
}
|
||||
|
||||
if (file_size_64 > kMaxDictionarySize) {
|
||||
fprintf(stderr, "dictionary [%s] is larger than maximum allowed: %d\n",
|
||||
PrintablePath(context->dictionary_path), kMaxDictionarySize);
|
||||
fclose(f);
|
||||
return BROTLI_FALSE;
|
||||
}
|
||||
context->dictionary_size = (size_t)file_size_64;
|
||||
|
@ -566,6 +568,7 @@ static BROTLI_BOOL ReadDictionary(Context* context) {
|
|||
buffer = (uint8_t*)malloc(context->dictionary_size);
|
||||
if (!buffer) {
|
||||
fprintf(stderr, "could not read dictionary: out of memory\n");
|
||||
fclose(f);
|
||||
return BROTLI_FALSE;
|
||||
}
|
||||
bytes_read = fread(buffer, sizeof(uint8_t), context->dictionary_size, f);
|
||||
|
@ -573,6 +576,7 @@ static BROTLI_BOOL ReadDictionary(Context* context) {
|
|||
free(buffer);
|
||||
fprintf(stderr, "failed to read dictionary [%s]: %s\n",
|
||||
PrintablePath(context->dictionary_path), strerror(errno));
|
||||
fclose(f);
|
||||
return BROTLI_FALSE;
|
||||
}
|
||||
fclose(f);
|
||||
|
|
|
@ -336,7 +336,7 @@ static PyObject* brotli_Compressor_finish(brotli_Compressor *self) {
|
|||
|
||||
end:
|
||||
if (ok) {
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size());
|
||||
} else {
|
||||
PyErr_SetString(BrotliError, "BrotliEncoderCompressStream failed while finishing the stream");
|
||||
}
|
||||
|
@ -531,7 +531,7 @@ static PyObject* brotli_Decompressor_process(brotli_Decompressor *self, PyObject
|
|||
|
||||
end:
|
||||
if (ok) {
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size());
|
||||
} else {
|
||||
PyErr_SetString(BrotliError, "BrotliDecoderDecompressStream failed while processing the stream");
|
||||
}
|
||||
|
@ -572,7 +572,7 @@ static PyObject* brotli_Decompressor_is_finished(brotli_Decompressor *self) {
|
|||
|
||||
end:
|
||||
if (ok) {
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.size() ? &output[0] : NULL), output.size());
|
||||
ret = PyBytes_FromStringAndSize((char*)(output.empty() ? NULL : &output[0]), output.size());
|
||||
} else {
|
||||
PyErr_SetString(BrotliError, "BrotliDecoderDecompressStream failed while finishing the stream");
|
||||
}
|
||||
|
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2016 The Brotli Authors. All rights reserved.
|
||||
#
|
||||
# Distributed under MIT license.
|
||||
# See file LICENSE for detail or copy at https://opensource.org/licenses/MIT
|
||||
|
||||
import functools
|
||||
import unittest
|
||||
|
||||
from . import _test_utils
|
||||
import brotli
|
||||
|
||||
|
||||
def _get_original_name(test_data):
|
||||
return test_data.split('.compressed')[0]
|
||||
|
||||
|
||||
class TestDecompressor(_test_utils.TestCase):
|
||||
|
||||
CHUNK_SIZE = 1
|
||||
|
||||
def setUp(self):
|
||||
self.decompressor = brotli.Decompressor()
|
||||
|
||||
def _check_decompression(self, test_data):
|
||||
# Verify decompression matches the original.
|
||||
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
||||
original = _get_original_name(test_data)
|
||||
self.assertFilesMatch(temp_uncompressed, original)
|
||||
|
||||
def _decompress(self, test_data):
|
||||
temp_uncompressed = _test_utils.get_temp_uncompressed_name(test_data)
|
||||
with open(temp_uncompressed, 'wb') as out_file:
|
||||
with open(test_data, 'rb') as in_file:
|
||||
read_chunk = functools.partial(in_file.read, self.CHUNK_SIZE)
|
||||
for data in iter(read_chunk, b''):
|
||||
out_file.write(self.decompressor.process(data))
|
||||
self.assertTrue(self.decompressor.is_finished())
|
||||
|
||||
def _test_decompress(self, test_data):
|
||||
self._decompress(test_data)
|
||||
self._check_decompression(test_data)
|
||||
|
||||
|
||||
_test_utils.generate_test_methods(TestDecompressor, for_decompression=True)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue