-
Notifications
You must be signed in to change notification settings - Fork 44
/
test_read_consistency.py
114 lines (92 loc) · 4.09 KB
/
test_read_consistency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import time
from datetime import datetime, timezone
from google.cloud import datastore
from . import _helpers
def _parent_key(datastore_client):
return datastore_client.key("Blog", "PizzaMan")
def _put_entity(datastore_client, entity_id):
key = datastore_client.key(
"read_time_test", entity_id, parent=_parent_key(datastore_client)
)
entity = datastore.Entity(key=key)
entity["field"] = "old_value"
datastore_client.put(entity)
return entity
@pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True)
def test_get_w_read_time(datastore_client, entities_to_delete, database_id):
entity = _put_entity(datastore_client, 1)
entities_to_delete.append(entity)
# Add some sleep to accommodate server & client clock discrepancy.
time.sleep(1)
read_time = datetime.now(tz=timezone.utc)
time.sleep(1)
entity["field"] = "new_value"
datastore_client.put(entity)
# Get without read_time.
retrieved_entity = datastore_client.get(entity.key)
assert retrieved_entity["field"] == "new_value"
# Directly specify read_time in get request.
retrieved_entity_from_read_time = datastore_client.get(
entity.key, read_time=read_time
)
assert retrieved_entity_from_read_time["field"] == "old_value"
# Use read_time in a read_only transaction.
with datastore_client.transaction(read_only=True, read_time=read_time):
retrieved_entity_from_xact = datastore_client.get(entity.key)
assert retrieved_entity_from_xact["field"] == "old_value"
@pytest.mark.parametrize("database_id", [None, _helpers.TEST_DATABASE], indirect=True)
def test_query_w_read_time(datastore_client, entities_to_delete, database_id):
entity0 = _put_entity(datastore_client, 1)
entity1 = _put_entity(datastore_client, 2)
entity2 = _put_entity(datastore_client, 3)
entities_to_delete.append(entity0)
entities_to_delete.append(entity1)
entities_to_delete.append(entity2)
# Add some sleep to accommodate server & client clock discrepancy.
time.sleep(1)
read_time = datetime.now(tz=timezone.utc)
time.sleep(1)
entity2["field"] = "new_value"
datastore_client.put(entity2)
query = datastore_client.query(
kind="read_time_test", ancestor=_parent_key(datastore_client)
)
query = query.add_filter("field", "=", "old_value")
# Query without read_time.
iterator = query.fetch()
page = next(iterator.pages)
query_results = list(page)
assert len(query_results) == 2
assert query_results[0].key == entity0.key
assert query_results[1].key == entity1.key
# Directly specify read_time in query.
iterator_read_time = query.fetch(read_time=read_time)
page_read_time = next(iterator_read_time.pages)
query_results_read_time = list(page_read_time)
assert len(query_results_read_time) == 3
assert query_results_read_time[0].key == entity0.key
assert query_results_read_time[1].key == entity1.key
assert query_results_read_time[2].key == entity2.key
# Run the query in a read_only transacxtion with read_time.
with datastore_client.transaction(read_only=True, read_time=read_time):
iterator_from_xact = query.fetch()
page_from_xact = next(iterator_from_xact.pages)
query_results_from_xact = list(page_from_xact)
assert len(query_results_from_xact) == 3
assert query_results_from_xact[0].key == entity0.key
assert query_results_from_xact[1].key == entity1.key
assert query_results_from_xact[2].key == entity2.key