Page Summary
-
This example demonstrates how to download reports from multiple Google Ads accounts concurrently.
-
The example provides code implementations in Java, C#, Python, and Ruby.
-
It includes predefined Google Ads Query Language (GAQL) strings for campaign and ad group reports.
-
The code handles potential errors during report downloads and reports success or failure for each customer ID and query combination.
-
Note that parallelizing requests to the same customer ID is not recommended due to rate limits.
Java
// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package com.google.ads.googleads.examples.reporting; import com.beust.jcommander.Parameter; import com.google.ads.googleads.examples.utils.ArgumentNames; import com.google.ads.googleads.examples.utils.CodeSampleParams; import com.google.ads.googleads.lib.GoogleAdsClient; import com.google.ads.googleads.v24.errors.GoogleAdsError; import com.google.ads.googleads.v24.errors.GoogleAdsException; import com.google.ads.googleads.v24.services.GoogleAdsServiceClient; import com.google.ads.googleads.v24.services.SearchGoogleAdsStreamRequest; import com.google.ads.googleads.v24.services.SearchGoogleAdsStreamResponse; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StreamController; import com.google.common.collect.ImmutableList; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; /** * Shows how to download a set of reports from a list of accounts in parallel. * * <p>If you need to obtain a list of accounts, please see the {@link * com.google.ads.googleads.examples.accountmanagement.GetAccountHierarchy} or {@link * com.google.ads.googleads.examples.accountmanagement.ListAccessibleCustomers} examples. */ public class ParallelReportDownload { // Adjust as required. /** Defines the Google Ads Query Language (GAQL) query strings to run for each customer ID. */ private static final List<String> GAQL_QUERY_STRINGS = ImmutableList.of( "SELECT campaign.id, metrics.impressions, metrics.clicks" + " FROM campaign" + " WHERE segments.date DURING LAST_30_DAYS", "SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks" + " FROM ad_group" + " WHERE segments.date DURING LAST_30_DAYS"); private static class ParallelReportDownloadParams extends CodeSampleParams { @Parameter( names = ArgumentNames.CUSTOMER_IDS, required = true, description = "Specify a comma-separated list of customer IDs to downloads reports from.") List<Long> customerIds; @Parameter( names = ArgumentNames.LOGIN_CUSTOMER_ID, description = "Optionally specify the manager account ID which provides access to the customer IDs") Long loginCustomerId; } public static void main(String[] args) throws InterruptedException { ParallelReportDownloadParams params = new ParallelReportDownloadParams(); if (!params.parseArguments(args)) { // Either pass the required parameters for this example on the command line, or insert them // into the code here. See the parameter class definition above for descriptions. params.customerIds = ImmutableList.of(Long.valueOf("INSERT CUSTOMER IDS")); // Optionally specify the login customer ID if your access to the CIDs is via a manager // account. // params.loginCustomerId = Long.parseLong("INSERT_LOGIN_CUSTOMER_ID"); } GoogleAdsClient googleAdsClient = null; try { GoogleAdsClient.Builder builder = GoogleAdsClient.newBuilder().fromPropertiesFile(); if (params.loginCustomerId != null) { builder.setLoginCustomerId(params.loginCustomerId); } googleAdsClient = builder.build(); } catch (FileNotFoundException fnfe) { System.err.printf( "Failed to load GoogleAdsClient configuration from file. Exception: %s%n", fnfe); return; } catch (IOException ioe) { System.err.printf("Failed to create GoogleAdsClient. Exception: %s%n", ioe); return; } try { new ParallelReportDownload().runExample(googleAdsClient, params.customerIds); } catch (GoogleAdsException gae) { // GoogleAdsException is the base class for most exceptions thrown by an API request. // Instances of this exception have a message and a GoogleAdsFailure that contains a // collection of GoogleAdsErrors that indicate the underlying causes of the // GoogleAdsException. System.err.printf( "Request ID %s failed due to GoogleAdsException. Underlying errors:%n", gae.getRequestId()); int i = 0; for (GoogleAdsError googleAdsError : gae.getGoogleAdsFailure().getErrorsList()) { System.err.printf(" Error %d: %s%n", i++, googleAdsError); } } } /** * Runs the example. * * @param googleAdsClient the client library instance for API access. * @param customerIds the customer IDs to run against. */ private void runExample(GoogleAdsClient googleAdsClient, List<Long> customerIds) throws InterruptedException { // Creates a single client which can be shared by all threads. // gRPC handles multiplexing parallel requests to the underlying API connection. try (GoogleAdsServiceClient serviceClient = googleAdsClient.getLatestVersion().createGoogleAdsServiceClient()) { // IMPORTANT: You should avoid hitting the same customer ID in parallel. There are rate limits // at the customer ID level which are much stricter than limits at the developer token level. // Hitting these limits frequently enough will significantly reduce throughput as the client // library will automatically retry with exponential back-off before failing the request. for (String gaqlQuery : GAQL_QUERY_STRINGS) { // Uses a list of futures to make sure that we wait for this report to complete on all // customer IDs before proceeding. The Future data type here is just for demonstration. List<ListenableFuture<ReportSummary>> futures = new ArrayList<>(); // Uses the API to retrieve the report for each customer ID. for (Long customerId : customerIds) { // Uses the gRPC asynchronous API to download the reports in parallel. This saves having // to create/manage our own thread pool. ResponseCountingObserver responseObserver = new ResponseCountingObserver(customerId); // Starts the report download in a background thread. serviceClient .searchStreamCallable() .call( SearchGoogleAdsStreamRequest.newBuilder() .setCustomerId(customerId.toString()) .setQuery(gaqlQuery) .build(), responseObserver); // Stores a future to retrieve the results. futures.add(responseObserver.asFuture()); } // Waits for all pending requests to the current set of customer IDs to complete. // // This is a naive implementation for illustrative purposes. It is possible to optimize the // utilization of each customer ID by providing a queue of work (or similar). However, this // would complicate the example code and so is omitted here. List<ReportSummary> results = Futures.allAsList(futures).get(); System.out.println("Report results for query: " + gaqlQuery); results.forEach(System.out::println); } } catch (ExecutionException e) { throw new RuntimeException(e); } } /** An observer which records a simple count of the result rows received. */ private static class ResponseCountingObserver implements ResponseObserver<SearchGoogleAdsStreamResponse> { private final long customerId; private final SettableFuture<ReportSummary> future = SettableFuture.create(); private final AtomicLong numResponses = new AtomicLong(0); ResponseCountingObserver(long customerId) { this.customerId = customerId; } @Override public void onStart(StreamController controller) { // Nothing to do here. } @Override public void onResponse(SearchGoogleAdsStreamResponse response) { // Does something useful with the response. In this case we just count the responses, but // could also write the response to a database/file, pass the response on to another method // for further processing, etc. numResponses.incrementAndGet(); // Note: this method may be called from multiple threads, though responses will always arrive // in the same order as returned by the API. } @Override public void onError(Throwable t) { // Notify that this report failed. notifyResultReady(new ReportSummary(customerId, numResponses.get(), t)); } @Override public void onComplete() { // Notify that this report succeeded. notifyResultReady(new ReportSummary(customerId, numResponses.get())); } /** Sets the value on the future and unblocks any threads waiting for result. */ private void notifyResultReady(ReportSummary summary) { future.set(summary); } /** Gets a {@link ListenableFuture} which represents the result of this stream. */ ListenableFuture<ReportSummary> asFuture() { return future; } } /** Summarizes the result of a reporting API call. */ private static class ReportSummary { private final Long customerId; private final long numResponses; private final Throwable throwable; ReportSummary(Long customerId, long numResponses, Throwable throwable) { this.customerId = customerId; this.throwable = throwable; this.numResponses = numResponses; } ReportSummary(Long customerId, long numResponses) { this(customerId, numResponses, null); } boolean isSuccess() { return throwable == null; } @Override public String toString() { return "Customer ID '" + customerId + "' Number of responses: " + numResponses + " IsSuccess? " + (isSuccess() ? "Yes!" : "No :-( Why? " + throwable.getMessage()); } } }
C#
// Copyright 2020 Google LLC. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. using CommandLine; using Google.Ads.Gax.Examples; using Google.Ads.GoogleAds.Lib; using Google.Ads.GoogleAds.V24.Errors; using Google.Ads.GoogleAds.V24.Services; using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Google.Ads.GoogleAds.Examples.V24 { /// <summary> /// Shows how to download a set of reports from a list of accounts in parallel. If you need /// to obtain a list of accounts, please see the GetAccountHierarchy or /// ListAccessibleCustomers examples."; /// </summary> public class ParallelReportDownload : ExampleBase { /// <summary> /// Command line options for running the <see cref="ParallelReportDownload"/> example. /// </summary> public class Options : OptionsBase { /// <summary> /// The Google Ads customer Id. /// </summary> [Option("customerIds", Required = true, HelpText = "The Google Ads customer IDs for which the call is made.")] public IEnumerable<long> CustomerIds { get; set; } /// <summary> /// Optional login customer ID if your access to the CIDs is via a manager account. /// </summary> [Option("loginCustomerId", Required = false, HelpText = "Optional login customer ID if your access to the CIDs is via a manager account.")] public long? LoginCustomerId { get; set; } } /// <summary> /// Main method, to run this code example as a standalone application. /// </summary> /// <param name="args">The command line arguments.</param> public static void Main(string[] args) { Options options = ExampleUtilities.ParseCommandLine<Options>(args); ParallelReportDownload codeExample = new ParallelReportDownload(); Console.WriteLine(codeExample.Description); codeExample.Run(new GoogleAdsClient(), options.CustomerIds.ToArray(), options.LoginCustomerId); } // Defines the Google Ads Query Language (GAQL) query strings to run for each customer ID. private readonly Dictionary<string, string> GAQL_QUERY_STRINGS = new Dictionary<string, string>() { { "Campaign Query", @"SELECT campaign.id, metrics.impressions, metrics.clicks FROM campaign WHERE segments.date DURING LAST_30_DAYS" }, { "Ad Group Query", @"SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks FROM ad_group WHERE segments.date DURING LAST_30_DAYS" } }; /// <summary> /// Returns a description about the code example. /// </summary> public override string Description => "Shows how to download a set of reports from a list of accounts in parallel. If you " + "need to obtain a list of accounts, please see the GetAccountHierarchy or " + "ListAccessibleCustomers examples."; /// <summary> /// Runs the code example. /// </summary> /// <param name="client">The Google Ads client.</param> /// <param name="customerIds">The Google Ads customer Id.</param> /// <param name="loginCustomerId">Optional login customer ID if your access to the CIDs /// is via a manager account.</param> public void Run(GoogleAdsClient client, long[] customerIds, long? loginCustomerId) { // If a manager ID is supplied, update the login credentials. if (loginCustomerId.HasValue) { client.Config.LoginCustomerId = loginCustomerId.ToString(); } // Get the GoogleAdsService. A single client can be shared by all threads. GoogleAdsServiceClient googleAdsService = client.GetService( Services.V24.GoogleAdsService); try { // Begin downloading reports and block program termination until complete. Task task = RunDownloadParallelAsync(googleAdsService, customerIds); task.Wait(); } catch (GoogleAdsException e) { Console.WriteLine("Failure:"); Console.WriteLine($"Message: {e.Message}"); Console.WriteLine($"Failure: {e.Failure}"); Console.WriteLine($"Request ID: {e.RequestId}"); throw; } } /// <summary> /// Initiate all download requests, wait for their completion, and report the results. /// </summary> /// <param name="googleAdsService">The Google Ads service.</param> /// <param name="customerIds">The list of customer IDs from which to request data.</param> /// <returns>The asynchronous operation.</returns> private async Task RunDownloadParallelAsync( GoogleAdsServiceClient googleAdsService, long[] customerIds) { // List of all requests to ensure that we wait for the reports to complete on all // customer IDs before proceeding. ConcurrentBag<Task<bool>> tasks = new ConcurrentBag<Task<bool>>(); // Collection of downloaded responses. ConcurrentBag<ReportDownload> responses = new ConcurrentBag<ReportDownload>(); // IMPORTANT: You should avoid hitting the same customer ID in parallel. There are rate // limits at the customer ID level which are much stricter than limits at the developer // token level. Hitting these limits frequently enough will significantly reduce // throughput as the client library will automatically retry with exponential back-off // before failing the request. Parallel.ForEach(GAQL_QUERY_STRINGS, query => { Parallel.ForEach(customerIds, customerId => { Console.WriteLine($"Requesting {query.Key} for CID {customerId}."); // Issue an asynchronous search request and add it to the list of requests // in progress. tasks.Add(DownloadReportAsync(googleAdsService, customerId, query.Key, query.Value, responses)); }); } ); Console.WriteLine($"Awaiting results from {tasks.Count} requests...\n"); // Proceed only when all requests have completed. await Task.WhenAll(tasks); // Give a summary report for each successful download. foreach (ReportDownload reportDownload in responses) { Console.WriteLine(reportDownload); } } /// <summary> /// Initiates one asynchronous report download. /// </summary> /// <param name="googleAdsService">The Google Ads service client.</param> /// <param name="customerId">The customer ID from which data is requested.</param> /// <param name="queryKey">The name of the query to be downloaded.</param> /// <param name="queryValue">The query for the download request.</param> /// <param name="responses">Collection of all successful report downloads.</param> /// <returns>The asynchronous operation.</returns> /// <exception cref="GoogleAdsException">Thrown if errors encountered in the execution of /// the request.</exception> private Task<bool> DownloadReportAsync( GoogleAdsServiceClient googleAdsService, long customerId, string queryKey, string queryValue, ConcurrentBag<ReportDownload> responses) { try { // Issue an asynchronous download request. googleAdsService.SearchStream( customerId.ToString(), queryValue, delegate (SearchGoogleAdsStreamResponse resp) { // Store the results. responses.Add(new ReportDownload() { CustomerId = customerId, QueryKey = queryKey, Response = resp }); } ); return Task.FromResult(true); } catch (AggregateException ae) { Console.WriteLine($"Download failed for {queryKey} and CID {customerId}!"); GoogleAdsException gae = GoogleAdsException.FromTaskException(ae); var download = new ReportDownload() { CustomerId = customerId, QueryKey = queryKey, Exception = gae }; if (gae != null) { Console.WriteLine($"Message: {gae.Message}"); Console.WriteLine($"Failure: {gae.Failure}"); Console.WriteLine($"Request ID: {gae.RequestId}"); download.Exception = gae; } else { download.Exception = ae; } responses.Add(download); return Task.FromResult(false); } } /// <summary> /// Stores a result from a reporting API call. In this case we simply report a count of /// the responses, but one could also write the response to a database/file, pass the /// response on to another method for further processing, etc. /// </summary> private class ReportDownload { internal long CustomerId { get; set; } internal string QueryKey { get; set; } internal SearchGoogleAdsStreamResponse Response { get; set; } internal Exception Exception { get; set; } public override string ToString() { if (Exception != null) { return $"Download failed for {QueryKey} and CID {CustomerId}. " + $"Exception: {Exception}"; } else { return $"{QueryKey} downloaded for CID {CustomerId}: " + $"{Response.Results.Count} rows returned."; } } } } }
PHP
This is not applicable to PHP because multi-threading cannot be used in a web server environment.
Python
#!/usr/bin/env python # Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Shows how to download in parallel a set of reports from a list of accounts. If you need to obtain a list of accounts, please see the account_management/get_account_hierarchy.py or account_management/list_accessible_customers.py examples. """ import argparse from itertools import product import multiprocessing import time from typing import Any, Dict, Iterable, List, Tuple from google.ads.googleads.client import GoogleAdsClient from google.ads.googleads.errors import GoogleAdsException from google.ads.googleads.v24.errors.types import ( ErrorLocation, GoogleAdsError, ) from google.ads.googleads.v24.services.services.google_ads_service import ( GoogleAdsServiceClient, ) from google.ads.googleads.v24.services.types import ( GoogleAdsRow, SearchGoogleAdsStreamResponse, ) # Maximum number of processes to spawn. MAX_PROCESSES: int = multiprocessing.cpu_count() # Timeout between retries in seconds. BACKOFF_FACTOR: int = 5 # Maximum number of retries for errors. MAX_RETRIES: int = 5 def main(client: GoogleAdsClient, customer_ids: List[str]) -> None: """The main method that creates all necessary entities for the example. Args: client: an initialized GoogleAdsClient instance. customer_ids: an array of client customer IDs. """ # Define the GAQL query strings to run for each customer ID. campaign_query: str = """ SELECT campaign.id, metrics.impressions, metrics.clicks FROM campaign WHERE segments.date DURING LAST_30_DAYS""" ad_group_query: str = """ SELECT campaign.id, ad_group.id, metrics.impressions, metrics.clicks FROM ad_group WHERE segments.date DURING LAST_30_DAYS""" inputs: Iterable[Tuple[GoogleAdsClient, str, str]] = generate_inputs( client, customer_ids, [campaign_query, ad_group_query] ) with multiprocessing.Pool(MAX_PROCESSES) as pool: # Call issue_search_request on each input, parallelizing the work # across processes in the pool. results: List[Tuple[bool, Dict[str, Any]]] = pool.starmap( issue_search_request, inputs ) # Partition our results into successful and failed results. successes: List[Dict[str, Any]] = [] failures: List[Dict[str, Any]] = [] res: Tuple[bool, Dict[str, Any]] for res in results: if res[0]: successes.append(res[1]) else: failures.append(res[1]) # Output results. print( f"Total successful results: {len(successes)}\n" f"Total failed results: {len(failures)}\n" ) print("Successes:") if len(successes) else None success: Dict[str, Any] for success in successes: # success["results"] represents an array of result strings for one # customer ID / query combination. result_str: str = "\n".join(success["results"]) print(result_str) print("Failures:") if len(failures) else None failure: Dict[str, Any] for failure in failures: ex: GoogleAdsException = failure["exception"] print( f'Request with ID "{ex.request_id}" failed with status ' f'"{ex.error.code().name}" for customer_id ' f'{failure["customer_id"]} and query "{failure["query"]}" and ' "includes the following errors:" ) error: GoogleAdsError for error in ex.failure.errors: print(f'\tError with message "{error.message}".') if error.location: field_path_element: ErrorLocation.FieldPathElement for ( field_path_element ) in error.location.field_path_elements: print(f"\t\tOn field: {field_path_element.field_name}") def issue_search_request( client: GoogleAdsClient, customer_id: str, query: str ) -> Tuple[bool, Dict[str, Any]]: """Issues a search request using streaming. Retries if a GoogleAdsException is caught, until MAX_RETRIES is reached. Args: client: an initialized GoogleAdsClient instance. customer_id: a client customer ID str. query: a GAQL query str. """ ga_service: GoogleAdsServiceClient = client.get_service("GoogleAdsService") retry_count: int = 0 # Retry until we've reached MAX_RETRIES or have successfully received a # response. while True: try: stream: Iterable[SearchGoogleAdsStreamResponse] = ( ga_service.search_stream(customer_id=customer_id, query=query) ) # Returning a list of GoogleAdsRows will result in a # PicklingError, so instead we put the GoogleAdsRow data # into a list of str results and return that. result_strings: List[str] = [] batch: SearchGoogleAdsStreamResponse for batch in stream: row: GoogleAdsRow for row in batch.results: ad_group_id: str = ( f"Ad Group ID {row.ad_group.id} in " if "ad_group.id" in query else "" ) result_string: str = ( f"{ad_group_id}" f"Campaign ID {row.campaign.id} " f"had {row.metrics.impressions} impressions " f"and {row.metrics.clicks} clicks." ) result_strings.append(result_string) return (True, {"results": result_strings}) except GoogleAdsException as ex: # This example retries on all GoogleAdsExceptions. In practice, # developers might want to limit retries to only those error codes # they deem retriable. if retry_count < MAX_RETRIES: retry_count += 1 time.sleep(retry_count * BACKOFF_FACTOR) else: return ( False, { "exception": ex, "customer_id": customer_id, "query": query, }, ) def generate_inputs( client: GoogleAdsClient, customer_ids: List[str], queries: List[str], ) -> Iterable[Tuple[GoogleAdsClient, str, str]]: """Generates all inputs to feed into search requests. A GoogleAdsService instance cannot be serialized with pickle for parallel processing, but a GoogleAdsClient can be, so we pass the client to the pool task which will then get the GoogleAdsService instance. Args: client: An initialized GoogleAdsClient instance. customer_ids: A list of str client customer IDs. queries: A list of str GAQL queries. """ return product([client], customer_ids, queries) if __name__ == "__main__": parser: argparse.ArgumentParser = argparse.ArgumentParser( description="Download a set of reports in parallel from a list of " "accounts." ) # The following argument(s) should be provided to run the example. parser.add_argument( "-c", "--customer_ids", nargs="+", type=str, required=True, help="The Google Ads customer IDs.", ) parser.add_argument( "-l", "--login_customer_id", type=str, help="The login customer ID (optional).", ) args: argparse.Namespace = parser.parse_args() # GoogleAdsClient will read the google-ads.yaml configuration file in the # home directory if none is specified. googleads_client: GoogleAdsClient = GoogleAdsClient.load_from_storage( version="v24" ) # Override the login_customer_id on the GoogleAdsClient, if specified. if args.login_customer_id is not None: googleads_client.login_customer_id = args.login_customer_id main(googleads_client, args.customer_ids)