Using the API#
Authentication / Configuration#
Use
Clientobjects to configure your applications.Clientobjects hold both aprojectand an authenticated connection to the PubSub service.The authentication credentials can be implicitly determined from the environment or directly via
from_service_account_jsonandfrom_service_account_p12.After setting
GOOGLE_APPLICATION_CREDENTIALSandGCLOUD_PROJECTenvironment variables, create aClient>>> from gcloud import pubsub >>> client = pubsub.Client()
Manage topics for a project#
List topics for the default project:
topics, token = client.list_topics() # API request
while True:
for topic in topics:
do_something_with(topic)
if token is None:
break
topics, token = client.list_topics(page_token=token) # API request
Create a new topic for the default project:
topic = client.topic(TOPIC_NAME)
topic.create() # API request
Check for the existence of a topic:
assert not topic.exists() # API request
topic.create() # API request
assert topic.exists() # API request
Delete a topic:
assert topic.exists() # API request
topic.delete()
assert not topic.exists() # API request
Fetch the IAM policy for a topic:
policy = topic.get_iam_policy() # API request
Update the IAM policy for a topic:
ALL_USERS = policy.all_users()
policy.viewers.add(ALL_USERS)
LOGS_GROUP = policy.group('cloud-logs@google.com')
policy.editors.add(LOGS_GROUP)
new_policy = topic.set_iam_policy(policy) # API request
Test permissions allowed by the current IAM policy on a topic:
from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
ALLOWED = topic.check_iam_permissions(TO_CHECK)
assert set(ALLOWED) == set(TO_CHECK)
Publish messages to a topic#
Publish a single message to a topic, without attributes:
topic.publish(b'This is the message payload') # API request
Publish a single message to a topic, with attributes:
topic.publish(b'Another message payload', extra='EXTRA') # API request
Publish a set of messages to a topic (as a single request):
with topic.batch() as batch:
batch.publish(b'This is the message payload')
batch.publish(b'Another message payload', extra='EXTRA')
Note
The only API request happens during the __exit__() of the topic
used as a context manager, and only if the block exits without raising
an exception.
Manage subscriptions to topics#
List all subscriptions for the default project:
subscriptions, token = client.list_subscriptions() # API request
while True:
for subscription in subscriptions:
do_something_with(subscription)
if token is None:
break
subscriptions, token = client.list_subscriptions(
page_token=token) # API request
List subscriptions for a topic:
subscriptions, token = topic.list_subscriptions() # API request
while True:
for subscription in subscriptions:
do_something_with(subscription)
if token is None:
break
subscriptions, token = topic.list_subscriptions(
page_token=token) # API request
Create a new pull subscription for a topic, with defaults:
sub_defaults = topic.subscription(SUB_DEFAULTS)
Create a new pull subscription for a topic with a non-default ACK deadline:
sub_ack90 = topic.subscription(SUB_ACK90, ack_deadline=90)
Create a new push subscription for a topic:
subscription = topic.subscription(SUB_PUSH, push_endpoint=PUSH_URL)
subscription.create() # API request
Check for the existence of a subscription:
assert subscription.exists() # API request
Convert a pull subscription to push:
subscription.modify_push_configuration(
push_endpoint=PUSH_URL) # API request
Convert a push subscription to pull:
subscription.modify_push_configuration(push_endpoint=None) # API request
Re-synchronize a subscription with the back-end:
subscription.reload() # API request
Delete a subscription:
subscription.delete() # API request
Pull messages from a subscription#
Fetch pending messages for a pull subscription:
pulled = subscription.pull(max_messages=2)
Note that received messages must be acknowledged, or else the back-end will re-send them later:
for ack_id, message in pulled:
try:
do_something_with(message)
except ApplicationException as e:
log_exception(e)
else:
subscription.acknowledge([ack_id])
Fetch messages for a pull subscription without blocking (none pending):
pulled = subscription.pull(return_immediately=True)
Update the acknowlegement deadline for pulled messages:
for ack_id, _ in pulled:
subscription.modify_ack_deadline(ack_id, 90) # API request
Fetch the IAM policy for a subscription
policy = subscription.get_iam_policy() # API request
Update the IAM policy for a subscription:
ALL_USERS = policy.all_users()
policy.viewers.add(ALL_USERS)
LOGS_GROUP = policy.group('cloud-logs@google.com')
policy.editors.add(LOGS_GROUP)
new_policy = subscription.set_iam_policy(policy) # API request
Test permissions allowed by the current IAM policy on a subscription:
from gcloud.pubsub.iam import OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE
TO_CHECK = [OWNER_ROLE, EDITOR_ROLE, VIEWER_ROLE]
ALLOWED = subscription.check_iam_permissions(TO_CHECK)
assert set(ALLOWED) == set(TO_CHECK)