aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRobert2017-07-27 18:21:59 +0200
committerRobert2017-07-27 22:28:22 +0200
commitaa5028a21f28a2bee9f64b5e87e70828c9c8b75f (patch)
treeec36385cad327418b8ec755f8df67487e2a87ca8
parentcbfa054cf35a2e81790e9c83f6edc2b3ff58ae84 (diff)
downloadchouette-core-aa5028a21f28a2bee9f64b5e87e70828c9c8b75f.tar.bz2
Refs: 3507@2.5h; Specing WorkbenchImportWorker failure and DB behavior
- speced and implemented Import instance behavior (status, total_steps, current_step) - failure behavior and it's impact to the above
-rw-r--r--app/workers/workbench_import_worker.rb21
-rw-r--r--db/schema.rb20
-rw-r--r--spec/services/zip_service/zip_output_streams_spec.rb4
-rw-r--r--spec/workers/workbench_import_worker_spec.rb80
4 files changed, 86 insertions, 39 deletions
diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb
index b7668c976..3f3d40a00 100644
--- a/app/workers/workbench_import_worker.rb
+++ b/app/workers/workbench_import_worker.rb
@@ -11,6 +11,7 @@ class WorkbenchImportWorker
def perform(import_id)
@import = Import.find(import_id)
@response = nil
+ @import.update_attributes(status: 'running')
downloaded = download
zip_service = ZipService.new(downloaded)
upload zip_service
@@ -45,18 +46,24 @@ class WorkbenchImportWorker
def try_upload_entry_group eg_name, eg_stream
result = execute_post eg_name, eg_stream
- return result if result.status < 400
+ return result if result && result.status < 400
@response = result.body
try_again
end
def upload zip_service
- zip_service.entry_group_streams.each(&method(:upload_entry_group))
- end
-
- def upload_entry_group key_pair
- retry_service = RetryService.new(delay: RETRY_DELAYS, rescue_from: HTTPService::Timeout, &method(:log_failure))
- retry_service.execute(&upload_entry_group_proc(key_pair))
+ entry_group_streams = zip_service.entry_group_streams
+ @import.update_attributes total_steps: entry_group_streams.size
+ entry_group_streams.each_with_index(&method(:upload_entry_group))
+ rescue StopIteration
+ @import.update_attributes( current_step: entry_group_streams.size, status: 'failed' )
+ end
+
+ def upload_entry_group key_pair, element_count
+ @import.update_attributes( current_step: element_count.succ )
+ retry_service = RetryService.new(delays: RETRY_DELAYS, rescue_from: HTTPService::Timeout, &method(:log_failure))
+ status, _ = retry_service.execute(&upload_entry_group_proc(key_pair))
+ raise StopIteration unless status == :ok
end
def upload_entry_group_proc key_pair
diff --git a/db/schema.rb b/db/schema.rb
index 01af2771d..05a024e1d 100644
--- a/db/schema.rb
+++ b/db/schema.rb
@@ -166,22 +166,6 @@ ActiveRecord::Schema.define(version: 20170727130705) do
add_index "connection_links", ["objectid"], name: "connection_links_objectid_key", unique: true, using: :btree
- create_table "delayed_jobs", id: :bigserial, force: :cascade do |t|
- t.integer "priority", default: 0
- t.integer "attempts", default: 0
- t.text "handler"
- t.text "last_error"
- t.datetime "run_at"
- t.datetime "locked_at"
- t.datetime "failed_at"
- t.string "locked_by", limit: 255
- t.string "queue", limit: 255
- t.datetime "created_at"
- t.datetime "updated_at"
- end
-
- add_index "delayed_jobs", ["priority", "run_at"], name: "delayed_jobs_priority", using: :btree
-
create_table "exports", id: :bigserial, force: :cascade do |t|
t.integer "referential_id", limit: 8
t.string "status"
@@ -851,13 +835,9 @@ ActiveRecord::Schema.define(version: 20170727130705) do
add_foreign_key "access_links", "access_points", name: "aclk_acpt_fkey"
add_foreign_key "group_of_lines_lines", "group_of_lines", name: "groupofline_group_fkey", on_delete: :cascade
- add_foreign_key "journey_frequencies", "timebands", name: "journey_frequencies_timeband_id_fk", on_delete: :nullify
add_foreign_key "journey_frequencies", "timebands", on_delete: :nullify
- add_foreign_key "journey_frequencies", "vehicle_journeys", name: "journey_frequencies_vehicle_journey_id_fk", on_delete: :nullify
add_foreign_key "journey_frequencies", "vehicle_journeys", on_delete: :nullify
- add_foreign_key "journey_pattern_sections", "journey_patterns", name: "journey_pattern_sections_journey_pattern_id_fk", on_delete: :cascade
add_foreign_key "journey_pattern_sections", "journey_patterns", on_delete: :cascade
- add_foreign_key "journey_pattern_sections", "route_sections", name: "journey_pattern_sections_route_section_id_fk", on_delete: :cascade
add_foreign_key "journey_pattern_sections", "route_sections", on_delete: :cascade
add_foreign_key "journey_patterns", "routes", name: "jp_route_fkey", on_delete: :cascade
add_foreign_key "journey_patterns", "stop_points", column: "arrival_stop_point_id", name: "arrival_point_fkey", on_delete: :nullify
diff --git a/spec/services/zip_service/zip_output_streams_spec.rb b/spec/services/zip_service/zip_output_streams_spec.rb
index 0b7a1e99b..b99e7bc2d 100644
--- a/spec/services/zip_service/zip_output_streams_spec.rb
+++ b/spec/services/zip_service/zip_output_streams_spec.rb
@@ -14,4 +14,8 @@ RSpec.describe ZipService do
expect( ref1_lines ).to eq %w(multiref/ref1/ multiref/ref1/datum-1 multiref/ref1/datum-2)
expect( ref2_lines ).to eq %w(multiref/ref2/ multiref/ref2/datum-1 multiref/ref2/datum-2)
end
+
+ it "exposes it's size" do
+ expect( subject.entry_group_streams.size ).to eq(2)
+ end
end
diff --git a/spec/workers/workbench_import_worker_spec.rb b/spec/workers/workbench_import_worker_spec.rb
index 68e429b60..b6057b36a 100644
--- a/spec/workers/workbench_import_worker_spec.rb
+++ b/spec/workers/workbench_import_worker_spec.rb
@@ -15,48 +15,104 @@ RSpec.describe WorkbenchImportWorker, type: [:worker, :request] do
let( :downloaded_zip ){ double("downloaded zip") }
let( :download_token ){ SecureRandom.urlsafe_base64 }
+
let( :upload_path ) { '/api/v1/netex_imports.json' }
let( :entry_group_streams ) do
- 2.times.map{ |i| double( "entry group stream #{i}" ) }
+ entry_count.times.map{ |i| double( "entry group stream #{i}" ) }
end
let( :entry_groups ) do
- 2.times.map do | i |
+ entry_count.times.map do | i |
{"entry_group_name#{i}" => entry_group_streams[i] }
end
end
let( :zip_service ){ double("zip service") }
+ let( :zip_file ){ File.join(fixture_path, 'multiref.zip') }
+
+ let( :post_response_ok ){ response(status: 201, boody: "{}") }
before do
+ # Silence Logger
+ allow_any_instance_of(Logger).to receive(:info)
+ allow_any_instance_of(Logger).to receive(:warn)
+
# That should be `build_stubbed's` job, no?
allow(Import).to receive(:find).with(import.id).and_return(import)
+
allow(Api::V1::ApiKey).to receive(:from).and_return(api_key)
allow(ZipService).to receive(:new).with(downloaded_zip).and_return zip_service
expect(zip_service).to receive(:entry_group_streams).and_return(entry_groups)
+ expect( import ).to receive(:update_attributes).with(status: 'running')
end
- context 'multireferential zipfile' do
- let( :zip_file ){ File.join(fixture_path, 'multiref.zip') }
+ context 'multireferential zipfile, no errors' do
+ let( :entry_count ){ 2 }
+
+ it 'downloads a zip file, cuts it, and uploads all pieces' do
- it 'downloads a zip file' do
expect(HTTPService).to receive(:get_resource)
.with(host: host, path: path, params: {token: download_token})
.and_return( downloaded_zip )
entry_groups.each do | entry_group_name, entry_group_stream |
- expect( HTTPService ).to receive(:post_resource)
- .with(host: host,
- path: upload_path,
- resource_name: 'netex_import',
- token: api_key.token,
- params: params,
- upload: {file: [entry_group_stream, 'application/zip', entry_group_name]})
+ mock_post entry_group_name, entry_group_stream, post_response_ok
+ end
+
+ expect( import ).to receive(:update_attributes).with(total_steps: 2)
+ expect( import ).to receive(:update_attributes).with(current_step: 1)
+ expect( import ).to receive(:update_attributes).with(current_step: 2)
+
+ worker.perform import.id
+
+ end
+ end
+
+ context 'multireferential zipfile with error' do
+ let( :entry_count ){ 3 }
+ let( :post_response_failure ){ response(status: 406, boody: {error: 'What was you thinking'}) }
+
+ it 'downloads a zip file, cuts it, and uploads some pieces' do
+ expect(HTTPService).to receive(:get_resource)
+ .with(host: host, path: path, params: {token: download_token})
+ .and_return( downloaded_zip )
+
+ # First entry_group succeeds
+ entry_groups[0..0].each do | entry_group_name, entry_group_stream |
+ mock_post entry_group_name, entry_group_stream, post_response_ok
end
+
+ # Second entry_group fails (M I S E R A B L Y)
+ entry_groups[1..1].each do | entry_group_name, entry_group_stream |
+ mock_post entry_group_name, entry_group_stream, post_response_failure
+ WorkbenchImportWorker::RETRY_DELAYS.each do | delay |
+ mock_post entry_group_name, entry_group_stream, post_response_failure
+ expect_any_instance_of(RetryService).to receive(:sleep).with(delay)
+ end
+ end
+
+ expect( import ).to receive(:update_attributes).with(total_steps: 3)
+ expect( import ).to receive(:update_attributes).with(current_step: 1)
+ expect( import ).to receive(:update_attributes).with(current_step: 2)
+ expect( import ).to receive(:update_attributes).with(current_step: 3, status: 'failed')
+
worker.perform import.id
end
end
+ def mock_post entry_group_name, entry_group_stream, response
+ expect( HTTPService ).to receive(:post_resource)
+ .with(host: host,
+ path: upload_path,
+ resource_name: 'netex_import',
+ token: api_key.token,
+ params: params,
+ upload: {file: [entry_group_stream, 'application/zip', entry_group_name]})
+ .and_return(response)
+ end
+ def response(**opts)
+ OpenStruct.new(opts)
+ end
end