From 4a55013728dbb8c996bf9913ea7cfe2f1d13abbb Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 16:07:01 +0000 Subject: [PATCH 1/9] refactor(bigquery-jdbc): optimize thread management and unify discovery logic in `DatabaseMetaData` methods --- .../jdbc/BigQueryDatabaseMetaData.java | 385 +++++++++--------- 1 file changed, 199 insertions(+), 186 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 1b5eb0af0a4e..eff4e23aade3 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -69,7 +69,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -96,7 +95,6 @@ class BigQueryDatabaseMetaData implements DatabaseMetaData { private static final String GET_IMPORTED_KEYS_SQL = "DatabaseMetaData_GetImportedKeys.sql"; private static final String GET_EXPORTED_KEYS_SQL = "DatabaseMetaData_GetExportedKeys.sql"; private static final String GET_CROSS_REFERENCE_SQL = "DatabaseMetaData_GetCrossReference.sql"; - private static final int API_EXECUTOR_POOL_SIZE = 50; private static final int DEFAULT_PAGE_SIZE = 500; private static final int DEFAULT_QUEUE_CAPACITY = 5000; // Declared package-private for testing. @@ -138,13 +136,11 @@ class BigQueryDatabaseMetaData implements DatabaseMetaData { String URL; BigQueryConnection connection; private final BigQuery bigquery; - private final int metadataFetchThreadCount; BigQueryDatabaseMetaData(BigQueryConnection connection) { this.URL = connection.getConnectionUrl(); this.connection = connection; this.bigquery = connection.getBigQuery(); - this.metadataFetchThreadCount = connection.getMetadataFetchThreadCount(); } @Override @@ -774,10 +770,10 @@ public boolean dataDefinitionIgnoredInTransactions() { @Override public ResultSet getProcedures( String catalog, String schemaPattern, String procedureNamePattern) { - if ((catalog == null || catalog.isEmpty()) + if ((catalog != null && catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty()) || (procedureNamePattern != null && procedureNamePattern.isEmpty())) { - LOG.warning("Returning empty ResultSet as catalog is null/empty or a pattern is empty."); + LOG.warning("Returning empty ResultSet as catalog is empty or a pattern is empty."); return new BigQueryJsonResultSet(); } @@ -802,16 +798,7 @@ public ResultSet getProcedures( try { List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + fetchMatchingDatasets(catalogParam, schemaPattern, schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Finishing."); @@ -897,6 +884,13 @@ public ResultSet getProcedures( } catch (Throwable t) { LOG.severe("Unexpected error in procedure fetcher runnable: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -1023,8 +1017,8 @@ Comparator defineGetProceduresComparator(FieldList resultSchemaF public ResultSet getProcedureColumns( String catalog, String schemaPattern, String procedureNamePattern, String columnNamePattern) { - if (catalog == null || catalog.isEmpty()) { - LOG.warning("Returning empty ResultSet because catalog (project) is null or empty."); + if (catalog != null && catalog.isEmpty()) { + LOG.warning("Returning empty ResultSet because catalog (project) is empty."); return new BigQueryJsonResultSet(); } if ((schemaPattern != null && schemaPattern.isEmpty()) @@ -1056,7 +1050,7 @@ public ResultSet getProcedureColumns( try { List datasetsToScan = - fetchMatchingDatasetsForProcedureColumns(catalogParam, schemaPattern, schemaRegex); + fetchMatchingDatasets(catalogParam, schemaPattern, schemaRegex); if (datasetsToScan.isEmpty() || Thread.currentThread().isInterrupted()) { LOG.info( "Fetcher: No matching datasets or interrupted early. Catalog: " + catalogParam); @@ -1107,12 +1101,24 @@ public ResultSet getProcedureColumns( + catalogParam + ". Error: " + e.getMessage()); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(e)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + } } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { signalEndOfData(queue, resultSchema.getFields()); LOG.info("Procedure column fetcher thread finished for catalog: " + catalogParam); @@ -1127,27 +1133,6 @@ public ResultSet getProcedureColumns( return resultSet; } - private List fetchMatchingDatasetsForProcedureColumns( - String catalogParam, String schemaPattern, Pattern schemaRegex) throws InterruptedException { - LOG.fine( - "Fetching matching datasets for catalog '%s', schemaPattern '%s'", - catalogParam, schemaPattern); - List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets(catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); - LOG.info( - "Found %d datasets to scan for procedures in catalog '%s'.", - datasetsToScan.size(), catalogParam); - return datasetsToScan; - } - List listMatchingProcedureIdsFromDatasets( List datasetsToScan, String procedureNamePattern, @@ -1618,11 +1603,11 @@ public ResultSet getTables( String effectiveCatalog = effectiveIdentifiers.x(); String effectiveSchemaPattern = effectiveIdentifiers.y(); - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + if ((effectiveCatalog != null && effectiveCatalog.isEmpty()) || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) || (tableNamePattern != null && tableNamePattern.isEmpty())) { LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + "Returning empty ResultSet as one or more patterns are empty or catalog is empty."); return new BigQueryJsonResultSet(); } @@ -1652,16 +1637,7 @@ public ResultSet getTables( try { List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + fetchMatchingDatasets(catalogParam, schemaParam, schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -1744,6 +1720,13 @@ public ResultSet getTables( } catch (Throwable t) { LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -1956,12 +1939,12 @@ public ResultSet getColumns( String effectiveCatalog = effectiveIdentifiers.x(); String effectiveSchemaPattern = effectiveIdentifiers.y(); - if ((effectiveCatalog == null || effectiveCatalog.isEmpty()) + if ((effectiveCatalog != null && effectiveCatalog.isEmpty()) || (effectiveSchemaPattern != null && effectiveSchemaPattern.isEmpty()) || (tableNamePattern != null && tableNamePattern.isEmpty()) || (columnNamePattern != null && columnNamePattern.isEmpty())) { LOG.warning( - "Returning empty ResultSet as one or more patterns are empty or catalog is null."); + "Returning empty ResultSet as one or more patterns are empty or catalog is empty."); return new BigQueryJsonResultSet(); } @@ -1990,16 +1973,7 @@ public ResultSet getColumns( try { List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaParam, - schemaRegex, - LOG); + fetchMatchingDatasets(catalogParam, schemaParam, schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -2069,6 +2043,13 @@ public ResultSet getColumns( } catch (Throwable t) { LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { taskFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -3505,99 +3486,123 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce final BlockingQueue queue = new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); - final List collectedResults = Collections.synchronizedList(new ArrayList<>()); - final String catalogParam = catalog; - Runnable schemaFetcher = - () -> { - final FieldList localResultSchemaFields = resultSchemaFields; - List projectsToScanList = new ArrayList<>(); - - try { - if (catalogParam != null) { - projectsToScanList.add(catalogParam); - } else { - projectsToScanList.addAll(getAccessibleCatalogNames()); - } + if (catalog != null) { + // Single-Catalog Path: completely synchronous on caller thread + try { + List datasets = fetchMatchingDatasets(catalog, schemaPattern, schemaRegex); + List collectedResults = new ArrayList<>(); + for (Dataset dataset : datasets) { + processSchemaInfo(dataset, collectedResults, resultSchemaFields); + } + Comparator comparator = defineGetSchemasComparator(resultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + populateQueue(collectedResults, queue, resultSchemaFields); + } catch (Throwable t) { + LOG.severe("Unexpected error in synchronous getSchemas: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } + } finally { + signalEndOfData(queue, resultSchemaFields); + } + return BigQueryJsonResultSet.of(resultSchema, -1, queue, null, (Future) null); + } else { + // Multi-Catalog Path: fan out using connection-scoped metadataExecutor + Runnable multiSchemaFetcher = + () -> { + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List collectedDatasets = Collections.synchronizedList(new ArrayList<>()); + final List collectedResults = new ArrayList<>(); - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } + try { + List projectsToScanList = getAccessibleCatalogNames(); - for (String currentProjectToScan : projectsToScanList) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning( - "Schema fetcher interrupted during project iteration for project: " - + currentProjectToScan); - break; + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; } - LOG.info("Fetching schemas for project: " + currentProjectToScan); - List datasetsInProject = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - currentProjectToScan, - BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); - if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { - LOG.info( - "Fetcher thread found no matching datasets in project: " - + currentProjectToScan); - continue; + ExecutorService apiExecutor = connection.getMetadataExecutor(); + + LOG.fine("Submitting parallel fetchMatchingDatasets tasks..."); + for (String currentProjectToScan : projectsToScanList) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Fetcher interrupted during project iteration submission."); + break; + } + + Callable> apiCallable = + () -> fetchMatchingDatasets(currentProjectToScan, schemaPattern, schemaRegex); + Future> apiFuture = apiExecutor.submit(apiCallable); + apiFutures.add(apiFuture); } + LOG.fine( + "Finished submitting " + apiFutures.size() + " fetchMatchingDatasets tasks."); - LOG.fine("Processing found datasets for project: " + currentProjectToScan); - for (Dataset dataset : datasetsInProject) { + LOG.fine("Processing results from fetchMatchingDatasets tasks..."); + for (Future> apiFuture : apiFutures) { if (Thread.currentThread().isInterrupted()) { + LOG.warning("Fetcher interrupted while processing API futures."); + break; + } + try { + List datasetsResult = apiFuture.get(); + if (datasetsResult != null) { + collectedDatasets.addAll(datasetsResult); + } + } catch (ExecutionException e) { LOG.warning( - "Schema fetcher interrupted during dataset iteration for project: " - + currentProjectToScan); + "ExecutionException in fetchMatchingDatasets task: " + e.getMessage()); + } + } + + for (Dataset dataset : collectedDatasets) { + if (Thread.currentThread().isInterrupted()) { break; } processSchemaInfo(dataset, collectedResults, localResultSchemaFields); } - } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); + } catch (Throwable t) { + LOG.severe("Unexpected error in multi-schema fetcher runnable: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } + } finally { + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Multi-schema fetcher thread finished."); } - } finally { - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Schema fetcher thread finished."); - } - }; + }; - Future fetcherFuture = connection.getExecutorService().submit(schemaFetcher); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, fetcherFuture); + Future fetcherFuture = connection.getExecutorService().submit(multiSchemaFetcher); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, fetcherFuture); - LOG.info("Submitted background task for getSchemas to metadata executor"); - return resultSet; + LOG.info("Submitted background task for multi-catalog getSchemas to query executor"); + return resultSet; + } } Schema defineGetSchemasSchema() { @@ -3738,11 +3743,11 @@ Schema defineGetClientInfoPropertiesSchema() { @Override public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) { - if ((catalog == null || catalog.isEmpty()) + if ((catalog != null && catalog.isEmpty()) || (schemaPattern != null && schemaPattern.isEmpty()) || (functionNamePattern != null && functionNamePattern.isEmpty())) { LOG.warning( - "Returning empty ResultSet as catalog is null/empty or a pattern is empty for" + "Returning empty ResultSet as catalog is empty or a pattern is empty for" + " getFunctions."); return new BigQueryJsonResultSet(); } @@ -3768,16 +3773,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct try { List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + fetchMatchingDatasets(catalogParam, schemaPattern, schemaRegex); if (datasetsToScan.isEmpty()) { LOG.info("Fetcher thread found no matching datasets. Returning empty resultset."); @@ -3864,6 +3860,13 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct populateQueue(collectedResults, queue, localResultSchemaFields); } catch (Throwable t) { LOG.severe("Unexpected error in function fetcher runnable: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -3970,8 +3973,8 @@ Comparator defineGetFunctionsComparator(FieldList resultSchemaFi @Override public ResultSet getFunctionColumns( String catalog, String schemaPattern, String functionNamePattern, String columnNamePattern) { - if (catalog == null || catalog.isEmpty()) { - LOG.warning("Returning empty ResultSet catalog (project) is null or empty."); + if (catalog != null && catalog.isEmpty()) { + LOG.warning("Returning empty ResultSet catalog (project) is empty."); return new BigQueryJsonResultSet(); } if ((schemaPattern != null && schemaPattern.isEmpty()) @@ -4004,16 +4007,7 @@ public ResultSet getFunctionColumns( try { List datasetsToScan = - findMatchingBigQueryObjects( - "Dataset", - () -> - bigquery.listDatasets( - catalogParam, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(catalogParam, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); + fetchMatchingDatasets(catalogParam, schemaPattern, schemaRegex); if (datasetsToScan.isEmpty() || Thread.currentThread().isInterrupted()) { LOG.info( @@ -4065,12 +4059,24 @@ public ResultSet getFunctionColumns( + catalogParam + ". Error: " + e.getMessage()); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(e)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + } } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } } finally { signalEndOfData(queue, resultSchemaFields); LOG.info("Function column fetcher thread finished for catalog: " + catalogParam); @@ -4950,33 +4956,40 @@ private void signalEndOfData( } } - private void shutdownExecutor(ExecutorService executor) { - if (executor == null || executor.isShutdown()) { - return; - } - LOG.info("Shutting down column executor service..."); - executor.shutdown(); - try { - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { - LOG.warning("Executor did not terminate gracefully after 10s, forcing shutdownNow()."); - List droppedTasks = executor.shutdownNow(); - LOG.warning( - "Executor shutdownNow() initiated. Dropped tasks count: " + droppedTasks.size()); - if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { - LOG.severe("Executor did not terminate even after shutdownNow()."); - } - } - LOG.info("Executor shutdown complete."); - } catch (InterruptedException ie) { - LOG.warning( - "Interrupted while waiting for executor termination. Forcing shutdownNow() again."); - executor.shutdownNow(); - Thread.currentThread().interrupt(); + private String getCurrentCatalogName() { + return this.connection.getCatalog(); + } + + private List getProjectsToScan(String catalog) throws SQLException { + if (catalog != null) { + return Collections.singletonList(catalog); + } else { + return getAccessibleCatalogNames(); } } - private String getCurrentCatalogName() { - return this.connection.getCatalog(); + private List fetchMatchingDatasets( + String catalog, String schemaPattern, Pattern schemaRegex) throws SQLException { + List projects = getProjectsToScan(catalog); + List allDatasets = new ArrayList<>(); + for (String project : projects) { + if (Thread.currentThread().isInterrupted()) { + break; + } + List datasets = + findMatchingBigQueryObjects( + "Dataset", + () -> bigquery.listDatasets(project, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(project, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + if (datasets != null) { + allDatasets.addAll(datasets); + } + } + return allDatasets; } private List getAccessibleCatalogNames() throws SQLException { From 3da27408e2e5347b8d022b00e62250fb62e68ea9 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 16:22:37 +0000 Subject: [PATCH 2/9] chore: address pr feedback --- .../jdbc/BigQueryDatabaseMetaData.java | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index eff4e23aade3..4ec56977d4f5 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -1105,6 +1105,7 @@ public ResultSet getProcedureColumns( queue.put(BigQueryFieldValueListWrapper.ofError(e)); } catch (InterruptedException ie) { LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); } } catch (Throwable t) { LOG.severe( @@ -3517,7 +3518,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce () -> { final FieldList localResultSchemaFields = resultSchemaFields; final List>> apiFutures = new ArrayList<>(); - final List collectedDatasets = Collections.synchronizedList(new ArrayList<>()); + final List collectedDatasets = new ArrayList<>(); final List collectedResults = new ArrayList<>(); try { @@ -3591,6 +3592,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce Thread.currentThread().interrupt(); } } finally { + apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); LOG.info("Multi-schema fetcher thread finished."); } @@ -4063,6 +4065,7 @@ public ResultSet getFunctionColumns( queue.put(BigQueryFieldValueListWrapper.ofError(e)); } catch (InterruptedException ie) { LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); } } catch (Throwable t) { LOG.severe( @@ -4976,17 +4979,22 @@ private List fetchMatchingDatasets( if (Thread.currentThread().isInterrupted()) { break; } - List datasets = - findMatchingBigQueryObjects( - "Dataset", - () -> bigquery.listDatasets(project, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), - (name) -> bigquery.getDataset(DatasetId.of(project, name)), - (ds) -> ds.getDatasetId().getDataset(), - schemaPattern, - schemaRegex, - LOG); - if (datasets != null) { - allDatasets.addAll(datasets); + try { + List datasets = + findMatchingBigQueryObjects( + "Dataset", + () -> bigquery.listDatasets(project, DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), + (name) -> bigquery.getDataset(DatasetId.of(project, name)), + (ds) -> ds.getDatasetId().getDataset(), + schemaPattern, + schemaRegex, + LOG); + if (datasets != null) { + allDatasets.addAll(datasets); + } + } catch (Throwable t) { + LOG.warning( + "Failed to fetch matching datasets for project " + project + ": " + t.getMessage()); } } return allDatasets; From 50e4ec5cd91bd7a8742c0d9dcafbd01ec43056ad Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 16:49:21 +0000 Subject: [PATCH 3/9] chore: move error handling to helper method --- .../jdbc/BigQueryDatabaseMetaData.java | 105 ++++-------------- 1 file changed, 23 insertions(+), 82 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 4ec56977d4f5..251cbfae0bd4 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -884,13 +884,7 @@ public ResultSet getProcedures( } catch (Throwable t) { LOG.severe("Unexpected error in procedure fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -1101,25 +1095,14 @@ public ResultSet getProcedureColumns( + catalogParam + ". Error: " + e.getMessage()); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(e)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, e); } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { signalEndOfData(queue, resultSchema.getFields()); LOG.info("Procedure column fetcher thread finished for catalog: " + catalogParam); @@ -1721,13 +1704,7 @@ public ResultSet getTables( } catch (Throwable t) { LOG.severe("Unexpected error in table fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -2044,13 +2021,7 @@ public ResultSet getColumns( } catch (Throwable t) { LOG.severe("Unexpected error in column fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { taskFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -3501,13 +3472,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce populateQueue(collectedResults, queue, resultSchemaFields); } catch (Throwable t) { LOG.severe("Unexpected error in synchronous getSchemas: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { signalEndOfData(queue, resultSchemaFields); } @@ -3584,13 +3549,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce } catch (Throwable t) { LOG.severe("Unexpected error in multi-schema fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -3862,13 +3821,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct populateQueue(collectedResults, queue, localResultSchemaFields); } catch (Throwable t) { LOG.severe("Unexpected error in function fetcher runnable: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { apiFutures.forEach(f -> f.cancel(true)); signalEndOfData(queue, localResultSchemaFields); @@ -4061,25 +4014,14 @@ public ResultSet getFunctionColumns( + catalogParam + ". Error: " + e.getMessage()); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(e)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, e); } catch (Throwable t) { LOG.severe( "Fetcher: Unexpected error in main try block for catalog " + catalogParam + ". Error: " + t.getMessage()); - Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); - try { - queue.put(BigQueryFieldValueListWrapper.ofError(ex)); - } catch (InterruptedException ie) { - LOG.warning("Failed to put exception to queue due to interruption."); - Thread.currentThread().interrupt(); - } + writeErrorToQueue(queue, t); } finally { signalEndOfData(queue, resultSchemaFields); LOG.info("Function column fetcher thread finished for catalog: " + catalogParam); @@ -4959,21 +4901,10 @@ private void signalEndOfData( } } - private String getCurrentCatalogName() { - return this.connection.getCatalog(); - } - - private List getProjectsToScan(String catalog) throws SQLException { - if (catalog != null) { - return Collections.singletonList(catalog); - } else { - return getAccessibleCatalogNames(); - } - } - private List fetchMatchingDatasets( String catalog, String schemaPattern, Pattern schemaRegex) throws SQLException { - List projects = getProjectsToScan(catalog); + List projects = + (catalog != null) ? Collections.singletonList(catalog) : getAccessibleCatalogNames(); List allDatasets = new ArrayList<>(); for (String project : projects) { if (Thread.currentThread().isInterrupted()) { @@ -5002,7 +4933,7 @@ private List fetchMatchingDatasets( private List getAccessibleCatalogNames() throws SQLException { Set accessibleCatalogs = new HashSet<>(); - String primaryCatalog = getCurrentCatalogName(); + String primaryCatalog = this.connection.getCatalog(); if (primaryCatalog != null && !primaryCatalog.isEmpty()) { accessibleCatalogs.add(primaryCatalog); } @@ -5047,4 +4978,14 @@ static String readSqlFromFile(String filename) { String replaceSqlParameters(String sql, String... params) throws SQLException { return String.format(sql, (Object[]) params); } + + private void writeErrorToQueue(BlockingQueue queue, Throwable t) { + Exception ex = (t instanceof Exception) ? (Exception) t : new Exception(t); + try { + queue.put(BigQueryFieldValueListWrapper.ofError(ex)); + } catch (InterruptedException ie) { + LOG.warning("Failed to put exception to queue due to interruption."); + Thread.currentThread().interrupt(); + } + } } From 9ced8281c54ffea2167eaad12556a2d4bd44be76 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 17:10:08 +0000 Subject: [PATCH 4/9] fix NPE for getSchemas sync call --- .../google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java | 2 +- .../com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 251cbfae0bd4..39a7f801f986 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -3476,7 +3476,7 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce } finally { signalEndOfData(queue, resultSchemaFields); } - return BigQueryJsonResultSet.of(resultSchema, -1, queue, null, (Future) null); + return BigQueryJsonResultSet.of(resultSchema, -1, queue, null); } else { // Multi-Catalog Path: fan out using connection-scoped metadataExecutor Runnable multiSchemaFetcher = diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java index b36a3e018823..998a189eae2b 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryJsonResultSet.java @@ -311,7 +311,9 @@ public void close() { this.isClosed = true; if (ownedTasks != null) { for (Future ownedTask : ownedTasks) { - ownedTask.cancel(true); + if (ownedTask != null) { + ownedTask.cancel(true); + } } } super.close(); From 8b9d901fa05d9e6ae2e23559ef0a7bfe33565115 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 17:14:50 +0000 Subject: [PATCH 5/9] lint --- .../jdbc/BigQueryDatabaseMetaData.java | 138 +++++++++--------- 1 file changed, 68 insertions(+), 70 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 39a7f801f986..672730cc9981 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -3477,93 +3477,91 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce signalEndOfData(queue, resultSchemaFields); } return BigQueryJsonResultSet.of(resultSchema, -1, queue, null); - } else { - // Multi-Catalog Path: fan out using connection-scoped metadataExecutor - Runnable multiSchemaFetcher = - () -> { - final FieldList localResultSchemaFields = resultSchemaFields; - final List>> apiFutures = new ArrayList<>(); - final List collectedDatasets = new ArrayList<>(); - final List collectedResults = new ArrayList<>(); + } - try { - List projectsToScanList = getAccessibleCatalogNames(); + // Multi-Catalog Path: fan out using connection-scoped metadataExecutor + Runnable multiSchemaFetcher = + () -> { + final FieldList localResultSchemaFields = resultSchemaFields; + final List>> apiFutures = new ArrayList<>(); + final List collectedDatasets = new ArrayList<>(); + final List collectedResults = new ArrayList<>(); - if (projectsToScanList.isEmpty()) { - LOG.info( - "No valid projects to scan (primary, specified, or additional). Returning empty" - + " resultset."); - return; - } + try { + List projectsToScanList = getAccessibleCatalogNames(); - ExecutorService apiExecutor = connection.getMetadataExecutor(); + if (projectsToScanList.isEmpty()) { + LOG.info( + "No valid projects to scan (primary, specified, or additional). Returning empty" + + " resultset."); + return; + } - LOG.fine("Submitting parallel fetchMatchingDatasets tasks..."); - for (String currentProjectToScan : projectsToScanList) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted during project iteration submission."); - break; - } + ExecutorService apiExecutor = connection.getMetadataExecutor(); - Callable> apiCallable = - () -> fetchMatchingDatasets(currentProjectToScan, schemaPattern, schemaRegex); - Future> apiFuture = apiExecutor.submit(apiCallable); - apiFutures.add(apiFuture); + LOG.fine("Submitting parallel fetchMatchingDatasets tasks..."); + for (String currentProjectToScan : projectsToScanList) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Fetcher interrupted during project iteration submission."); + break; } - LOG.fine( - "Finished submitting " + apiFutures.size() + " fetchMatchingDatasets tasks."); - LOG.fine("Processing results from fetchMatchingDatasets tasks..."); - for (Future> apiFuture : apiFutures) { - if (Thread.currentThread().isInterrupted()) { - LOG.warning("Fetcher interrupted while processing API futures."); - break; - } - try { - List datasetsResult = apiFuture.get(); - if (datasetsResult != null) { - collectedDatasets.addAll(datasetsResult); - } - } catch (ExecutionException e) { - LOG.warning( - "ExecutionException in fetchMatchingDatasets task: " + e.getMessage()); - } - } + Callable> apiCallable = + () -> fetchMatchingDatasets(currentProjectToScan, schemaPattern, schemaRegex); + Future> apiFuture = apiExecutor.submit(apiCallable); + apiFutures.add(apiFuture); + } + LOG.fine("Finished submitting " + apiFutures.size() + " fetchMatchingDatasets tasks."); - for (Dataset dataset : collectedDatasets) { - if (Thread.currentThread().isInterrupted()) { - break; + LOG.fine("Processing results from fetchMatchingDatasets tasks..."); + for (Future> apiFuture : apiFutures) { + if (Thread.currentThread().isInterrupted()) { + LOG.warning("Fetcher interrupted while processing API futures."); + break; + } + try { + List datasetsResult = apiFuture.get(); + if (datasetsResult != null) { + collectedDatasets.addAll(datasetsResult); } - processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + } catch (ExecutionException e) { + LOG.warning("ExecutionException in fetchMatchingDatasets task: " + e.getMessage()); } + } - if (!Thread.currentThread().isInterrupted()) { - Comparator comparator = - defineGetSchemasComparator(localResultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); + for (Dataset dataset : collectedDatasets) { + if (Thread.currentThread().isInterrupted()) { + break; } + processSchemaInfo(dataset, collectedResults, localResultSchemaFields); + } - if (!Thread.currentThread().isInterrupted()) { - populateQueue(collectedResults, queue, localResultSchemaFields); - } + if (!Thread.currentThread().isInterrupted()) { + Comparator comparator = + defineGetSchemasComparator(localResultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + } - } catch (Throwable t) { - LOG.severe("Unexpected error in multi-schema fetcher runnable: " + t.getMessage()); - writeErrorToQueue(queue, t); - } finally { - apiFutures.forEach(f -> f.cancel(true)); - signalEndOfData(queue, localResultSchemaFields); - LOG.info("Multi-schema fetcher thread finished."); + if (!Thread.currentThread().isInterrupted()) { + populateQueue(collectedResults, queue, localResultSchemaFields); } - }; - Future fetcherFuture = connection.getExecutorService().submit(multiSchemaFetcher); - BigQueryJsonResultSet resultSet = - BigQueryJsonResultSet.of(resultSchema, -1, queue, null, fetcherFuture); + } catch (Throwable t) { + LOG.severe("Unexpected error in multi-schema fetcher runnable: " + t.getMessage()); + writeErrorToQueue(queue, t); + } finally { + apiFutures.forEach(f -> f.cancel(true)); + signalEndOfData(queue, localResultSchemaFields); + LOG.info("Multi-schema fetcher thread finished."); + } + }; + + Future fetcherFuture = connection.getExecutorService().submit(multiSchemaFetcher); + BigQueryJsonResultSet resultSet = + BigQueryJsonResultSet.of(resultSchema, -1, queue, null, fetcherFuture); - LOG.info("Submitted background task for multi-catalog getSchemas to query executor"); - return resultSet; - } + LOG.info("Submitted background task for multi-catalog getSchemas to query executor"); + return resultSet; } Schema defineGetSchemasSchema() { From 0aacf50f4138a7686979be7ab9f0f3c36b598fd2 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 17:39:39 +0000 Subject: [PATCH 6/9] throw error when catalog is not null --- .../cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 672730cc9981..91261bcd961a 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -3457,7 +3457,9 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce final FieldList resultSchemaFields = resultSchema.getFields(); final BlockingQueue queue = - new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); + (catalog != null) + ? new LinkedBlockingQueue<>() + : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY); if (catalog != null) { // Single-Catalog Path: completely synchronous on caller thread @@ -4921,9 +4923,12 @@ private List fetchMatchingDatasets( if (datasets != null) { allDatasets.addAll(datasets); } - } catch (Throwable t) { + } catch (Exception e) { + if (catalog != null) { + throw new SQLException("Failed to fetch matching datasets for project " + project, e); + } LOG.warning( - "Failed to fetch matching datasets for project " + project + ": " + t.getMessage()); + "Failed to fetch matching datasets for project " + project + ": " + e.getMessage()); } } return allDatasets; From ed113d66e07f4a077323ff310571e19fc3d2f382 Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 17:47:36 +0000 Subject: [PATCH 7/9] fix failing IT --- .../cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java index 789ccced452c..15880f74795b 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java @@ -792,8 +792,10 @@ public void testDatabaseMetadataGetSchemas() throws SQLException { Assertions.assertFalse(rsNoMatch.next()); // Test case 4: Get schemas with non-existent catalog - rsNoMatch = databaseMetaData.getSchemas("invalid-catalog", null); - Assertions.assertFalse(rsNoMatch.next()); + Assertions.assertThrows( + SQLException.class, + () -> databaseMetaData.getSchemas("invalid-catalog", null), + "Should throw SQLException for non-existent catalog"); connection.close(); } From aba306f3a8866d6780ac17259d979b076d676f2a Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 18:46:14 +0000 Subject: [PATCH 8/9] fix failing tests --- .../cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java | 9 ++++++++- .../bigquery/jdbc/BigQueryDatabaseMetaDataTest.java | 6 +++--- .../cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java | 4 ++-- 3 files changed, 13 insertions(+), 6 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 91261bcd961a..33707171466c 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -4585,7 +4585,8 @@ List findMatchingBigQueryObjects( Function nameExtractor, String pattern, Pattern regex, - BigQueryJdbcCustomLogger logger) { + BigQueryJdbcCustomLogger logger) + throws InterruptedException { boolean needsList = needsListing(pattern); List resultList = new ArrayList<>(); @@ -4639,14 +4640,17 @@ List findMatchingBigQueryObjects( logger.warning( "BigQueryException finding %ss for pattern '%s': %s (Code: %d)", objectTypeName, pattern, e.getMessage(), e.getCode()); + throw e; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); logger.warning("Interrupted while finding " + objectTypeName + "s."); + throw e; } catch (Exception e) { logger.severe( "Unexpected exception finding %ss for pattern '%s': %s", objectTypeName, pattern, e.getMessage()); + throw new RuntimeException(e); } return resultList; } @@ -4923,6 +4927,9 @@ private List fetchMatchingDatasets( if (datasets != null) { allDatasets.addAll(datasets); } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } catch (Exception e) { if (catalog != null) { throw new SQLException("Failed to fetch matching datasets for project " + project, e); diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java index 3a97f41924a8..2b72ed01b8a3 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaDataTest.java @@ -1017,7 +1017,7 @@ public void testSortResults_Procedures_EmptyList() { } @Test - public void testFindMatchingBigQueryObjects_Routines_ListWithPattern() { + public void testFindMatchingBigQueryObjects_Routines_ListWithPattern() throws Exception { String catalog = "p-cat"; String schema = "d-sch"; String pattern = "proc_%"; @@ -1063,7 +1063,7 @@ public void testFindMatchingBigQueryObjects_Routines_ListWithPattern() { } @Test - public void testFindMatchingBigQueryObjects_Routines_ListNoPattern() { + public void testFindMatchingBigQueryObjects_Routines_ListNoPattern() throws Exception { String catalog = "p-cat"; String schema = "d-sch"; String pattern = null; @@ -1102,7 +1102,7 @@ public void testFindMatchingBigQueryObjects_Routines_ListNoPattern() { } @Test - public void testFindMatchingBigQueryObjects_Routines_GetSpecific() { + public void testFindMatchingBigQueryObjects_Routines_GetSpecific() throws Exception { String catalog = "p-cat"; String schema = "d-sch"; String procNameExact = "exactprocname"; diff --git a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java index 15880f74795b..e40ba8bccd8c 100644 --- a/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java +++ b/java-bigquery-jdbc/src/test/java/com/google/cloud/bigquery/jdbc/it/ITDatabaseMetadataTest.java @@ -937,9 +937,9 @@ public void testDatabaseMetaDataGetFunctions() throws SQLException { rsEmptyFunction.next(), "Empty function name pattern should return no results"); rsEmptyFunction.close(); - // Test 9: Null catalog + // Test 9: Null catalog should return all functions (spec-compliant) ResultSet rsNullCatalog = databaseMetaData.getFunctions(null, testSchema, null); - Assertions.assertFalse(rsNullCatalog.next(), "Null catalog should return no results"); + Assertions.assertTrue(rsNullCatalog.next(), "Null catalog should return results"); rsNullCatalog.close(); connection.close(); } From ec24a067a11cdc7a9c6bedb9ce6467b66c165e7c Mon Sep 17 00:00:00 2001 From: Keshav Dandeva Date: Fri, 26 Jun 2026 19:18:03 +0000 Subject: [PATCH 9/9] fix error propagation --- .../jdbc/BigQueryDatabaseMetaData.java | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java index 33707171466c..839cba297b9c 100644 --- a/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java +++ b/java-bigquery-jdbc/src/main/java/com/google/cloud/bigquery/jdbc/BigQueryDatabaseMetaData.java @@ -3463,21 +3463,15 @@ public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLExce if (catalog != null) { // Single-Catalog Path: completely synchronous on caller thread - try { - List datasets = fetchMatchingDatasets(catalog, schemaPattern, schemaRegex); - List collectedResults = new ArrayList<>(); - for (Dataset dataset : datasets) { - processSchemaInfo(dataset, collectedResults, resultSchemaFields); - } - Comparator comparator = defineGetSchemasComparator(resultSchemaFields); - sortResults(collectedResults, comparator, "getSchemas", LOG); - populateQueue(collectedResults, queue, resultSchemaFields); - } catch (Throwable t) { - LOG.severe("Unexpected error in synchronous getSchemas: " + t.getMessage()); - writeErrorToQueue(queue, t); - } finally { - signalEndOfData(queue, resultSchemaFields); + List datasets = fetchMatchingDatasets(catalog, schemaPattern, schemaRegex); + List collectedResults = new ArrayList<>(); + for (Dataset dataset : datasets) { + processSchemaInfo(dataset, collectedResults, resultSchemaFields); } + Comparator comparator = defineGetSchemasComparator(resultSchemaFields); + sortResults(collectedResults, comparator, "getSchemas", LOG); + populateQueue(collectedResults, queue, resultSchemaFields); + signalEndOfData(queue, resultSchemaFields); return BigQueryJsonResultSet.of(resultSchema, -1, queue, null); }