diff options
| -rw-r--r-- | app/workers/workbench_import_worker.rb | 21 | ||||
| -rw-r--r-- | db/schema.rb | 20 | ||||
| -rw-r--r-- | spec/services/zip_service/zip_output_streams_spec.rb | 4 | ||||
| -rw-r--r-- | spec/workers/workbench_import_worker_spec.rb | 80 |
4 files changed, 86 insertions, 39 deletions
diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb index b7668c976..3f3d40a00 100644 --- a/app/workers/workbench_import_worker.rb +++ b/app/workers/workbench_import_worker.rb @@ -11,6 +11,7 @@ class WorkbenchImportWorker def perform(import_id) @import = Import.find(import_id) @response = nil + @import.update_attributes(status: 'running') downloaded = download zip_service = ZipService.new(downloaded) upload zip_service @@ -45,18 +46,24 @@ class WorkbenchImportWorker def try_upload_entry_group eg_name, eg_stream result = execute_post eg_name, eg_stream - return result if result.status < 400 + return result if result && result.status < 400 @response = result.body try_again end def upload zip_service - zip_service.entry_group_streams.each(&method(:upload_entry_group)) - end - - def upload_entry_group key_pair - retry_service = RetryService.new(delay: RETRY_DELAYS, rescue_from: HTTPService::Timeout, &method(:log_failure)) - retry_service.execute(&upload_entry_group_proc(key_pair)) + entry_group_streams = zip_service.entry_group_streams + @import.update_attributes total_steps: entry_group_streams.size + entry_group_streams.each_with_index(&method(:upload_entry_group)) + rescue StopIteration + @import.update_attributes( current_step: entry_group_streams.size, status: 'failed' ) + end + + def upload_entry_group key_pair, element_count + @import.update_attributes( current_step: element_count.succ ) + retry_service = RetryService.new(delays: RETRY_DELAYS, rescue_from: HTTPService::Timeout, &method(:log_failure)) + status, _ = retry_service.execute(&upload_entry_group_proc(key_pair)) + raise StopIteration unless status == :ok end def upload_entry_group_proc key_pair diff --git a/db/schema.rb b/db/schema.rb index 01af2771d..05a024e1d 100644 --- a/db/schema.rb +++ b/db/schema.rb @@ -166,22 +166,6 @@ ActiveRecord::Schema.define(version: 20170727130705) do add_index "connection_links", ["objectid"], name: "connection_links_objectid_key", unique: true, using: :btree - create_table "delayed_jobs", id: :bigserial, force: :cascade do |t| - t.integer "priority", default: 0 - t.integer "attempts", default: 0 - t.text "handler" - t.text "last_error" - t.datetime "run_at" - t.datetime "locked_at" - t.datetime "failed_at" - t.string "locked_by", limit: 255 - t.string "queue", limit: 255 - t.datetime "created_at" - t.datetime "updated_at" - end - - add_index "delayed_jobs", ["priority", "run_at"], name: "delayed_jobs_priority", using: :btree - create_table "exports", id: :bigserial, force: :cascade do |t| t.integer "referential_id", limit: 8 t.string "status" @@ -851,13 +835,9 @@ ActiveRecord::Schema.define(version: 20170727130705) do add_foreign_key "access_links", "access_points", name: "aclk_acpt_fkey" add_foreign_key "group_of_lines_lines", "group_of_lines", name: "groupofline_group_fkey", on_delete: :cascade - add_foreign_key "journey_frequencies", "timebands", name: "journey_frequencies_timeband_id_fk", on_delete: :nullify add_foreign_key "journey_frequencies", "timebands", on_delete: :nullify - add_foreign_key "journey_frequencies", "vehicle_journeys", name: "journey_frequencies_vehicle_journey_id_fk", on_delete: :nullify add_foreign_key "journey_frequencies", "vehicle_journeys", on_delete: :nullify - add_foreign_key "journey_pattern_sections", "journey_patterns", name: "journey_pattern_sections_journey_pattern_id_fk", on_delete: :cascade add_foreign_key "journey_pattern_sections", "journey_patterns", on_delete: :cascade - add_foreign_key "journey_pattern_sections", "route_sections", name: "journey_pattern_sections_route_section_id_fk", on_delete: :cascade add_foreign_key "journey_pattern_sections", "route_sections", on_delete: :cascade add_foreign_key "journey_patterns", "routes", name: "jp_route_fkey", on_delete: :cascade add_foreign_key "journey_patterns", "stop_points", column: "arrival_stop_point_id", name: "arrival_point_fkey", on_delete: :nullify diff --git a/spec/services/zip_service/zip_output_streams_spec.rb b/spec/services/zip_service/zip_output_streams_spec.rb index 0b7a1e99b..b99e7bc2d 100644 --- a/spec/services/zip_service/zip_output_streams_spec.rb +++ b/spec/services/zip_service/zip_output_streams_spec.rb @@ -14,4 +14,8 @@ RSpec.describe ZipService do expect( ref1_lines ).to eq %w(multiref/ref1/ multiref/ref1/datum-1 multiref/ref1/datum-2) expect( ref2_lines ).to eq %w(multiref/ref2/ multiref/ref2/datum-1 multiref/ref2/datum-2) end + + it "exposes it's size" do + expect( subject.entry_group_streams.size ).to eq(2) + end end diff --git a/spec/workers/workbench_import_worker_spec.rb b/spec/workers/workbench_import_worker_spec.rb index 68e429b60..b6057b36a 100644 --- a/spec/workers/workbench_import_worker_spec.rb +++ b/spec/workers/workbench_import_worker_spec.rb @@ -15,48 +15,104 @@ RSpec.describe WorkbenchImportWorker, type: [:worker, :request] do let( :downloaded_zip ){ double("downloaded zip") } let( :download_token ){ SecureRandom.urlsafe_base64 } + let( :upload_path ) { '/api/v1/netex_imports.json' } let( :entry_group_streams ) do - 2.times.map{ |i| double( "entry group stream #{i}" ) } + entry_count.times.map{ |i| double( "entry group stream #{i}" ) } end let( :entry_groups ) do - 2.times.map do | i | + entry_count.times.map do | i | {"entry_group_name#{i}" => entry_group_streams[i] } end end let( :zip_service ){ double("zip service") } + let( :zip_file ){ File.join(fixture_path, 'multiref.zip') } + + let( :post_response_ok ){ response(status: 201, boody: "{}") } before do + # Silence Logger + allow_any_instance_of(Logger).to receive(:info) + allow_any_instance_of(Logger).to receive(:warn) + # That should be `build_stubbed's` job, no? allow(Import).to receive(:find).with(import.id).and_return(import) + allow(Api::V1::ApiKey).to receive(:from).and_return(api_key) allow(ZipService).to receive(:new).with(downloaded_zip).and_return zip_service expect(zip_service).to receive(:entry_group_streams).and_return(entry_groups) + expect( import ).to receive(:update_attributes).with(status: 'running') end - context 'multireferential zipfile' do - let( :zip_file ){ File.join(fixture_path, 'multiref.zip') } + context 'multireferential zipfile, no errors' do + let( :entry_count ){ 2 } + + it 'downloads a zip file, cuts it, and uploads all pieces' do - it 'downloads a zip file' do expect(HTTPService).to receive(:get_resource) .with(host: host, path: path, params: {token: download_token}) .and_return( downloaded_zip ) entry_groups.each do | entry_group_name, entry_group_stream | - expect( HTTPService ).to receive(:post_resource) - .with(host: host, - path: upload_path, - resource_name: 'netex_import', - token: api_key.token, - params: params, - upload: {file: [entry_group_stream, 'application/zip', entry_group_name]}) + mock_post entry_group_name, entry_group_stream, post_response_ok + end + + expect( import ).to receive(:update_attributes).with(total_steps: 2) + expect( import ).to receive(:update_attributes).with(current_step: 1) + expect( import ).to receive(:update_attributes).with(current_step: 2) + + worker.perform import.id + + end + end + + context 'multireferential zipfile with error' do + let( :entry_count ){ 3 } + let( :post_response_failure ){ response(status: 406, boody: {error: 'What was you thinking'}) } + + it 'downloads a zip file, cuts it, and uploads some pieces' do + expect(HTTPService).to receive(:get_resource) + .with(host: host, path: path, params: {token: download_token}) + .and_return( downloaded_zip ) + + # First entry_group succeeds + entry_groups[0..0].each do | entry_group_name, entry_group_stream | + mock_post entry_group_name, entry_group_stream, post_response_ok end + + # Second entry_group fails (M I S E R A B L Y) + entry_groups[1..1].each do | entry_group_name, entry_group_stream | + mock_post entry_group_name, entry_group_stream, post_response_failure + WorkbenchImportWorker::RETRY_DELAYS.each do | delay | + mock_post entry_group_name, entry_group_stream, post_response_failure + expect_any_instance_of(RetryService).to receive(:sleep).with(delay) + end + end + + expect( import ).to receive(:update_attributes).with(total_steps: 3) + expect( import ).to receive(:update_attributes).with(current_step: 1) + expect( import ).to receive(:update_attributes).with(current_step: 2) + expect( import ).to receive(:update_attributes).with(current_step: 3, status: 'failed') + worker.perform import.id end end + def mock_post entry_group_name, entry_group_stream, response + expect( HTTPService ).to receive(:post_resource) + .with(host: host, + path: upload_path, + resource_name: 'netex_import', + token: api_key.token, + params: params, + upload: {file: [entry_group_stream, 'application/zip', entry_group_name]}) + .and_return(response) + end + def response(**opts) + OpenStruct.new(opts) + end end |
