Skip to content
This repository was archived by the owner on Mar 31, 2026. It is now read-only.

Commit 89bfe7a

Browse files
authored
feat(experimental): flush the last chunk in append method (#1699)
Earlier the last chunk was being flushed while calling the close() method. Now it will be done inside the append method itself.
1 parent a57ea0e commit 89bfe7a

File tree

5 files changed

+162
-27
lines changed

5 files changed

+162
-27
lines changed

google/cloud/storage/_experimental/asyncio/async_appendable_object_writer.py

Lines changed: 30 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,9 @@ async def append(self, data: bytes) -> None:
188188
ie. `self.offset` bytes relative to the begining of the object.
189189
190190
This method sends the provided `data` to the GCS server in chunks.
191-
and persists data in GCS at every `_MAX_BUFFER_SIZE_BYTES` bytes by
192-
calling `self.simple_flush`.
191+
and persists data in GCS at every `_DEFAULT_FLUSH_INTERVAL_BYTES` bytes
192+
or at the last chunk whichever is earlier. Persisting is done by setting
193+
`flush=True` on request.
193194
194195
:type data: bytes
195196
:param data: The bytes to append to the object.
@@ -214,20 +215,33 @@ async def append(self, data: bytes) -> None:
214215
while start_idx < total_bytes:
215216
end_idx = min(start_idx + _MAX_CHUNK_SIZE_BYTES, total_bytes)
216217
data_chunk = data[start_idx:end_idx]
218+
is_last_chunk = end_idx == total_bytes
219+
chunk_size = end_idx - start_idx
217220
await self.write_obj_stream.send(
218221
_storage_v2.BidiWriteObjectRequest(
219222
write_offset=self.offset,
220223
checksummed_data=_storage_v2.ChecksummedData(
221224
content=data_chunk,
222225
crc32c=int.from_bytes(Checksum(data_chunk).digest(), "big"),
223226
),
227+
state_lookup=is_last_chunk,
228+
flush=is_last_chunk
229+
or (
230+
self.bytes_appended_since_last_flush + chunk_size
231+
>= self.flush_interval
232+
),
224233
)
225234
)
226-
chunk_size = end_idx - start_idx
227235
self.offset += chunk_size
228236
self.bytes_appended_since_last_flush += chunk_size
237+
229238
if self.bytes_appended_since_last_flush >= self.flush_interval:
230-
await self.simple_flush()
239+
self.bytes_appended_since_last_flush = 0
240+
241+
if is_last_chunk:
242+
response = await self.write_obj_stream.recv()
243+
self.persisted_size = response.persisted_size
244+
self.offset = self.persisted_size
231245
self.bytes_appended_since_last_flush = 0
232246
start_idx = end_idx
233247

@@ -292,20 +306,24 @@ async def close(self, finalize_on_close=False) -> Union[int, _storage_v2.Object]
292306
raise ValueError("Stream is not open. Call open() before close().")
293307

294308
if finalize_on_close:
295-
await self.finalize()
296-
else:
297-
await self.flush()
309+
return await self.finalize()
298310

299311
await self.write_obj_stream.close()
300312

301313
self._is_stream_open = False
302314
self.offset = None
303-
return self.object_resource if finalize_on_close else self.persisted_size
315+
return self.persisted_size
304316

305317
async def finalize(self) -> _storage_v2.Object:
306318
"""Finalizes the Appendable Object.
307319
308320
Note: Once finalized no more data can be appended.
321+
This method is different from `close`. if `.close()` is called data may
322+
still be appended to object at a later point in time by opening with
323+
generation number.
324+
(i.e. `open(..., generation=<object_generation_number>)`.
325+
However if `.finalize()` is called no more data can be appended to the
326+
object.
309327
310328
rtype: google.cloud.storage_v2.types.Object
311329
returns: The finalized object resource.
@@ -322,6 +340,10 @@ async def finalize(self) -> _storage_v2.Object:
322340
response = await self.write_obj_stream.recv()
323341
self.object_resource = response.resource
324342
self.persisted_size = self.object_resource.size
343+
await self.write_obj_stream.close()
344+
345+
self._is_stream_open = False
346+
self.offset = None
325347
return self.object_resource
326348

327349
# helper methods.